/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.extensions;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.extensions.SparkExtensionsTestBase;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.assertj.core.api.AbstractFileAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class TestExpireSnapshotsProcedure
extends SparkExtensionsTestBase {
    public TestExpireSnapshotsProcedure(String catalogName, String implementation, Map<String, String> config) {
        super(catalogName, implementation, config);
    }

    @After
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
    }

    @Test
    public void testExpireSnapshotsInEmptyTable() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        List output = this.sql("CALL %s.system.expire_snapshots('%s')", new Object[]{this.catalogName, this.tableIdent});
        this.assertEquals("Should not delete any files", (List)ImmutableList.of((Object)this.row(new Object[]{0L, 0L, 0L, 0L, 0L, 0L})), output);
    }

    @Test
    public void testExpireSnapshotsUsingPositionalArgs() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot firstSnapshot = table.currentSnapshot();
        this.waitUntilAfter(firstSnapshot.timestampMillis());
        this.sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        table.refresh();
        Snapshot secondSnapshot = table.currentSnapshot();
        Timestamp secondSnapshotTimestamp = Timestamp.from(Instant.ofEpochMilli(secondSnapshot.timestampMillis()));
        Assert.assertEquals((String)"Should be 2 snapshots", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        List output1 = this.sql("CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s')", new Object[]{this.catalogName, this.tableIdent, secondSnapshotTimestamp});
        this.assertEquals("Procedure output must match", (List)ImmutableList.of((Object)this.row(new Object[]{0L, 0L, 0L, 0L, 1L, 0L})), output1);
        table.refresh();
        Assert.assertEquals((String)"Should expire one snapshot", (long)1L, (long)Iterables.size((Iterable)table.snapshots()));
        this.sql("INSERT OVERWRITE %s VALUES (3, 'c')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (4, 'd')", new Object[]{this.tableName});
        this.assertEquals("Should have expected rows", (List)ImmutableList.of((Object)this.row(new Object[]{3L, "c"}), (Object)this.row(new Object[]{4L, "d"})), this.sql("SELECT * FROM %s ORDER BY id", new Object[]{this.tableName}));
        table.refresh();
        this.waitUntilAfter(table.currentSnapshot().timestampMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        Assert.assertEquals((String)"Should be 3 snapshots", (long)3L, (long)Iterables.size((Iterable)table.snapshots()));
        List output = this.sql("CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s', 2)", new Object[]{this.catalogName, this.tableIdent, currentTimestamp});
        this.assertEquals("Procedure output must match", (List)ImmutableList.of((Object)this.row(new Object[]{2L, 0L, 0L, 2L, 1L, 0L})), output);
    }

    @Test
    public void testExpireSnapshotUsingNamedArgs() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals((String)"Should be 2 snapshots", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        this.waitUntilAfter(table.currentSnapshot().timestampMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output = this.sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", new Object[]{this.catalogName, currentTimestamp, this.tableIdent});
        this.assertEquals("Procedure output must match", (List)ImmutableList.of((Object)this.row(new Object[]{0L, 0L, 0L, 0L, 1L, 0L})), output);
    }

    @Test
    public void testExpireSnapshotsGCDisabled() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'false')", new Object[]{this.tableName, "gc.enabled"});
        AssertHelpers.assertThrows((String)"Should reject call", ValidationException.class, (String)"Cannot expire snapshots: GC is disabled", () -> this.sql("CALL %s.system.expire_snapshots('%s')", new Object[]{this.catalogName, this.tableIdent}));
    }

    @Test
    public void testInvalidExpireSnapshotsCases() {
        AssertHelpers.assertThrows((String)"Should not allow mixed args", AnalysisException.class, (String)"Named and positional arguments cannot be mixed", () -> this.sql("CALL %s.system.expire_snapshots('n', table => 't')", new Object[]{this.catalogName}));
        AssertHelpers.assertThrows((String)"Should not resolve procedures in arbitrary namespaces", NoSuchProcedureException.class, (String)"not found", () -> this.sql("CALL %s.custom.expire_snapshots('n', 't')", new Object[]{this.catalogName}));
        AssertHelpers.assertThrows((String)"Should reject calls without all required args", AnalysisException.class, (String)"Missing required parameters", () -> this.sql("CALL %s.system.expire_snapshots()", new Object[]{this.catalogName}));
        AssertHelpers.assertThrows((String)"Should reject calls with invalid arg types", AnalysisException.class, (String)"Wrong arg type", () -> this.sql("CALL %s.system.expire_snapshots('n', 2.2)", new Object[]{this.catalogName}));
        AssertHelpers.assertThrows((String)"Should reject calls with empty table identifier", IllegalArgumentException.class, (String)"Cannot handle an empty identifier", () -> this.sql("CALL %s.system.expire_snapshots('')", new Object[]{this.catalogName}));
    }

    @Test
    public void testResolvingTableInAnotherCatalog() throws IOException {
        String anotherCatalog = "another_" + this.catalogName;
        spark.conf().set("spark.sql.catalog." + anotherCatalog, SparkCatalog.class.getName());
        spark.conf().set("spark.sql.catalog." + anotherCatalog + ".type", "hadoop");
        spark.conf().set("spark.sql.catalog." + anotherCatalog + ".warehouse", "file:" + this.temp.newFolder().toString());
        this.sql("CREATE TABLE %s.%s (id bigint NOT NULL, data string) USING iceberg", new Object[]{anotherCatalog, this.tableIdent});
        AssertHelpers.assertThrows((String)"Should reject calls for a table in another catalog", IllegalArgumentException.class, (String)"Cannot run procedure in catalog", () -> this.sql("CALL %s.system.expire_snapshots('%s')", new Object[]{this.catalogName, anotherCatalog + "." + this.tableName}));
    }

    @Test
    public void testConcurrentExpireSnapshots() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (3, 'c')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (4, 'd')", new Object[]{this.tableName});
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output = this.sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s',max_concurrent_deletes => %s)", new Object[]{this.catalogName, currentTimestamp, this.tableIdent, 4});
        this.assertEquals("Expiring snapshots concurrently should succeed", (List)ImmutableList.of((Object)this.row(new Object[]{0L, 0L, 0L, 0L, 3L, 0L})), output);
    }

    @Test
    public void testConcurrentExpireSnapshotsWithInvalidInput() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        AssertHelpers.assertThrows((String)"Should throw an error when max_concurrent_deletes = 0", IllegalArgumentException.class, (String)"max_concurrent_deletes should have value > 0", () -> this.sql("CALL %s.system.expire_snapshots(table => '%s', max_concurrent_deletes => %s)", new Object[]{this.catalogName, this.tableIdent, 0}));
        AssertHelpers.assertThrows((String)"Should throw an error when max_concurrent_deletes < 0 ", IllegalArgumentException.class, (String)"max_concurrent_deletes should have value > 0", () -> this.sql("CALL %s.system.expire_snapshots(table => '%s', max_concurrent_deletes => %s)", new Object[]{this.catalogName, this.tableIdent, -1}));
    }

    @Test
    public void testExpireDeleteFiles() throws Exception {
        this.sql("CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES('format-version'='2', 'write.delete.mode'='merge-on-read')", new Object[]{this.tableName});
        ArrayList records = Lists.newArrayList((Object[])new SimpleRecord[]{new SimpleRecord(Integer.valueOf(1), "a"), new SimpleRecord(Integer.valueOf(2), "b"), new SimpleRecord(Integer.valueOf(3), "c"), new SimpleRecord(Integer.valueOf(4), "d")});
        spark.createDataset((List)records, Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(this.tableName).append();
        this.sql("DELETE FROM %s WHERE id=1", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals((String)"Should have 1 delete manifest", (long)1L, (long)TestHelpers.deleteManifests((Table)table).size());
        Assert.assertEquals((String)"Should have 1 delete file", (long)1L, (long)TestHelpers.deleteFiles((Table)table).size());
        Path deleteManifestPath = new Path(((ManifestFile)TestHelpers.deleteManifests((Table)table).iterator().next()).path());
        Path deleteFilePath = new Path(String.valueOf(((DeleteFile)TestHelpers.deleteFiles((Table)table).iterator().next()).path()));
        this.sql("CALL %s.system.rewrite_data_files(table => '%s',options => map('delete-file-threshold','1','use-starting-sequence-number', 'false'))", new Object[]{this.catalogName, this.tableIdent});
        table.refresh();
        this.sql("INSERT INTO TABLE %s VALUES (5, 'e')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (6, 'f')", new Object[]{this.tableName});
        table.refresh();
        Assert.assertEquals((String)"Should have no delete manifests", (long)0L, (long)TestHelpers.deleteManifests((Table)table).size());
        Assert.assertEquals((String)"Should have no delete files", (long)0L, (long)TestHelpers.deleteFiles((Table)table).size());
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)new Configuration());
        Assert.assertTrue((String)"Delete manifest should still exist", (boolean)localFs.exists(deleteManifestPath));
        Assert.assertTrue((String)"Delete file should still exist", (boolean)localFs.exists(deleteFilePath));
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output = this.sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", new Object[]{this.catalogName, currentTimestamp, this.tableIdent});
        this.assertEquals("Should deleted 1 data and pos delete file and 4 manifests and lists (one for each txn)", (List)ImmutableList.of((Object)this.row(new Object[]{1L, 1L, 0L, 4L, 4L, 0L})), output);
        Assert.assertFalse((String)"Delete manifest should be removed", (boolean)localFs.exists(deleteManifestPath));
        Assert.assertFalse((String)"Delete file should be removed", (boolean)localFs.exists(deleteFilePath));
    }

    @Test
    public void testExpireSnapshotWithStreamResultsEnabled() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals((String)"Should be 2 snapshots", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        this.waitUntilAfter(table.currentSnapshot().timestampMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output = this.sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s',stream_results => true)", new Object[]{this.catalogName, currentTimestamp, this.tableIdent});
        this.assertEquals("Procedure output must match", (List)ImmutableList.of((Object)this.row(new Object[]{0L, 0L, 0L, 0L, 1L, 0L})), output);
    }

    @Test
    public void testExpireSnapshotsWithSnapshotId() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals((String)"Should be 2 snapshots", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        long firstSnapshotId = table.currentSnapshot().parentId();
        this.sql("CALL %s.system.expire_snapshots(table => '%s',snapshot_ids => ARRAY(%d))", new Object[]{this.catalogName, this.tableIdent, firstSnapshotId});
        table.refresh();
        Assert.assertEquals((String)"Should be 1 snapshots", (long)1L, (long)Iterables.size((Iterable)table.snapshots()));
        Assert.assertEquals((String)"Snapshot ID should not be present", (long)0L, (long)Iterables.size((Iterable)Iterables.filter((Iterable)table.snapshots(), snapshot -> snapshot.snapshotId() == firstSnapshotId)));
    }

    @Test
    public void testExpireSnapshotShouldFailForCurrentSnapshot() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals((String)"Should be 2 snapshots", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        AssertHelpers.assertThrows((String)"Should reject call", IllegalArgumentException.class, (String)"Cannot expire", () -> this.sql("CALL %s.system.expire_snapshots(table => '%s',snapshot_ids => ARRAY(%d, %d))", new Object[]{this.catalogName, this.tableIdent, table.currentSnapshot().snapshotId(), table.currentSnapshot().parentId()}));
    }

    @Test
    public void testExpireSnapshotsProcedureWorksWithSqlComments() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (2, 'b')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals((String)"Should be 2 snapshots", (long)2L, (long)Iterables.size((Iterable)table.snapshots()));
        this.waitUntilAfter(table.currentSnapshot().timestampMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        String callStatement = "/* CALL statement is used to expire snapshots */\n-- And we have single line comments as well \n/* And comments that span *multiple* \n lines */ CALL /* this is the actual CALL */ %s.system.expire_snapshots(   older_than => TIMESTAMP '%s',   table => '%s')";
        List output = this.sql(callStatement, new Object[]{this.catalogName, currentTimestamp, this.tableIdent});
        this.assertEquals("Procedure output must match", (List)ImmutableList.of((Object)this.row(new Object[]{0L, 0L, 0L, 0L, 1L, 0L})), output);
        table.refresh();
        Assert.assertEquals((String)"Should be 1 snapshot remaining", (long)1L, (long)Iterables.size((Iterable)table.snapshots()));
    }

    @Test
    public void testExpireSnapshotsWithStatisticFiles() throws Exception {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("INSERT INTO TABLE %s VALUES (10, 'abc')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        String statsFileLocation1 = this.statsFileLocation(table.location());
        StatisticsFile statisticsFile1 = this.writeStatsFile(table.currentSnapshot().snapshotId(), table.currentSnapshot().sequenceNumber(), statsFileLocation1, table.io());
        table.updateStatistics().setStatistics(statisticsFile1.snapshotId(), statisticsFile1).commit();
        this.sql("INSERT INTO %s SELECT 20, 'def'", new Object[]{this.tableName});
        table.refresh();
        String statsFileLocation2 = this.statsFileLocation(table.location());
        StatisticsFile statisticsFile2 = this.writeStatsFile(table.currentSnapshot().snapshotId(), table.currentSnapshot().sequenceNumber(), statsFileLocation2, table.io());
        table.updateStatistics().setStatistics(statisticsFile2.snapshotId(), statisticsFile2).commit();
        this.waitUntilAfter(table.currentSnapshot().timestampMillis());
        Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
        List output = this.sql("CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')", new Object[]{this.catalogName, currentTimestamp, this.tableIdent});
        ((ObjectAssert)Assertions.assertThat((Object)((Object[])output.get(0))[5]).as("should be 1 deleted statistics file", new Object[0])).isEqualTo((Object)1L);
        table.refresh();
        List statsWithSnapshotId1 = table.statisticsFiles().stream().filter(statisticsFile -> statisticsFile.snapshotId() == statisticsFile1.snapshotId()).collect(Collectors.toList());
        ((ListAssert)Assertions.assertThat(statsWithSnapshotId1).as("Statistics file entry in TableMetadata should be deleted for the snapshot %s", new Object[]{statisticsFile1.snapshotId()})).isEmpty();
        ((ListAssert)Assertions.assertThat((List)table.statisticsFiles()).as("Statistics file entry in TableMetadata should be present for the snapshot %s", new Object[]{statisticsFile2.snapshotId()})).extracting(StatisticsFile::snapshotId).containsExactly((Object[])new Long[]{statisticsFile2.snapshotId()});
        ((AbstractFileAssert)Assertions.assertThat((File)new File(statsFileLocation1)).as("Statistics file should not exist for snapshot %s", new Object[]{statisticsFile1.snapshotId()})).doesNotExist();
        ((AbstractFileAssert)Assertions.assertThat((File)new File(statsFileLocation2)).as("Statistics file should exist for snapshot %s", new Object[]{statisticsFile2.snapshotId()})).exists();
    }

    private StatisticsFile writeStatsFile(long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO) throws IOException {
        try (PuffinWriter puffinWriter = Puffin.write((OutputFile)fileIO.newOutputFile(statsLocation)).build();){
            puffinWriter.add(new Blob("some-blob-type", (List)ImmutableList.of((Object)1), snapshotId, snapshotSequenceNumber, ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
            puffinWriter.finish();
            GenericStatisticsFile genericStatisticsFile = new GenericStatisticsFile(snapshotId, statsLocation, puffinWriter.fileSize(), puffinWriter.footerSize(), (List)puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).collect(ImmutableList.toImmutableList()));
            return genericStatisticsFile;
        }
    }

    private String statsFileLocation(String tableLocation) {
        String statsFileName = "stats-file-" + UUID.randomUUID();
        return tableLocation.replaceFirst("file:", "") + "/metadata/" + statsFileName;
    }
}

