/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.extensions;

import java.util.List;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.extensions.ExtensionsTestBase;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Row;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestChangelogTable
extends ExtensionsTestBase {
    @Parameter(index=3)
    private int formatVersion;

    @Parameters(name="catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}")
    public static Object[][] parameters() {
        return new Object[][]{{SparkCatalogConfig.SPARK.catalogName(), SparkCatalogConfig.SPARK.implementation(), SparkCatalogConfig.SPARK.properties(), 1}, {SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), SparkCatalogConfig.HIVE.properties(), 2}};
    }

    @AfterEach
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
    }

    @TestTemplate
    public void testDataFilters() {
        this.createTableWithDefaultRows();
        this.sql("INSERT INTO %s VALUES (3, 'c')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot snap3 = table.currentSnapshot();
        this.sql("DELETE FROM %s WHERE id = 3", new Object[]{this.tableName});
        table.refresh();
        Snapshot snap4 = table.currentSnapshot();
        this.assertEquals("Should have expected rows", (List)ImmutableList.of((Object)this.row(new Object[]{3, "c", "INSERT", 2, snap3.snapshotId()}), (Object)this.row(new Object[]{3, "c", "DELETE", 3, snap4.snapshotId()})), this.sql("SELECT * FROM %s.changes WHERE id = 3 ORDER BY _change_ordinal, id", new Object[]{this.tableName}));
    }

    @TestTemplate
    public void testOverwrites() {
        this.createTableWithDefaultRows();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot snap2 = table.currentSnapshot();
        this.sql("INSERT OVERWRITE %s VALUES (-2, 'b')", new Object[]{this.tableName});
        table.refresh();
        Snapshot snap3 = table.currentSnapshot();
        this.assertEquals("Rows should match", (List)ImmutableList.of((Object)this.row(new Object[]{2, "b", "DELETE", 0, snap3.snapshotId()}), (Object)this.row(new Object[]{-2, "b", "INSERT", 0, snap3.snapshotId()})), this.changelogRecords(snap2, snap3));
    }

    @TestTemplate
    public void testQueryWithTimeRange() {
        this.createTable();
        this.sql("INSERT INTO %s VALUES (1, 'a')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot snap1 = table.currentSnapshot();
        long rightAfterSnap1 = this.waitUntilAfter(snap1.timestampMillis());
        this.sql("INSERT INTO %s VALUES (2, 'b')", new Object[]{this.tableName});
        table.refresh();
        Snapshot snap2 = table.currentSnapshot();
        long rightAfterSnap2 = this.waitUntilAfter(snap2.timestampMillis());
        this.sql("INSERT OVERWRITE %s VALUES (-2, 'b')", new Object[]{this.tableName});
        table.refresh();
        Snapshot snap3 = table.currentSnapshot();
        long rightAfterSnap3 = this.waitUntilAfter(snap3.timestampMillis());
        this.assertEquals("Should have expected changed rows only from snapshot 3", (List)ImmutableList.of((Object)this.row(new Object[]{2, "b", "DELETE", 0, snap3.snapshotId()}), (Object)this.row(new Object[]{-2, "b", "INSERT", 0, snap3.snapshotId()})), this.changelogRecords(rightAfterSnap2, snap3.timestampMillis()));
        this.assertEquals("Should have expected changed rows only from snapshot 3", (List)ImmutableList.of((Object)this.row(new Object[]{2, "b", "DELETE", 0, snap3.snapshotId()}), (Object)this.row(new Object[]{-2, "b", "INSERT", 0, snap3.snapshotId()})), this.changelogRecords(snap2.timestampMillis(), snap3.timestampMillis()));
        this.assertEquals("Should have expected changed rows from snapshot 2 and 3", (List)ImmutableList.of((Object)this.row(new Object[]{2, "b", "INSERT", 0, snap2.snapshotId()}), (Object)this.row(new Object[]{2, "b", "DELETE", 1, snap3.snapshotId()}), (Object)this.row(new Object[]{-2, "b", "INSERT", 1, snap3.snapshotId()})), this.changelogRecords(rightAfterSnap1, snap3.timestampMillis()));
        this.assertEquals("Should have expected changed rows up to the current snapshot", (List)ImmutableList.of((Object)this.row(new Object[]{2, "b", "INSERT", 0, snap2.snapshotId()}), (Object)this.row(new Object[]{2, "b", "DELETE", 1, snap3.snapshotId()}), (Object)this.row(new Object[]{-2, "b", "INSERT", 1, snap3.snapshotId()})), this.changelogRecords(rightAfterSnap1, null));
        this.assertEquals("Should have empty changed rows if end time is before the first snapshot", (List)ImmutableList.of(), this.changelogRecords(null, snap1.timestampMillis() - 1L));
        this.assertEquals("Should have empty changed rows if start time is after the current snapshot", (List)ImmutableList.of(), this.changelogRecords(rightAfterSnap3, null));
        this.assertEquals("Should have empty changed rows if end time is before the first snapshot", (List)ImmutableList.of(), this.changelogRecords(null, snap1.timestampMillis() - 1L));
        this.assertEquals("Should have empty changed rows if there are no snapshots between start time and end time", (List)ImmutableList.of(), this.changelogRecords(rightAfterSnap2, snap3.timestampMillis() - 1L));
    }

    @TestTemplate
    public void testTimeRangeValidation() {
        this.createTableWithDefaultRows();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot snap2 = table.currentSnapshot();
        this.sql("INSERT OVERWRITE %s VALUES (-2, 'b')", new Object[]{this.tableName});
        table.refresh();
        Snapshot snap3 = table.currentSnapshot();
        long rightAfterSnap3 = this.waitUntilAfter(snap3.timestampMillis());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.changelogRecords(snap3.timestampMillis(), snap2.timestampMillis())).isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot set start-timestamp to be greater than end-timestamp for changelogs");
    }

    @TestTemplate
    public void testMetadataDeletes() {
        this.createTableWithDefaultRows();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot snap2 = table.currentSnapshot();
        this.sql("DELETE FROM %s WHERE data = 'a'", new Object[]{this.tableName});
        table.refresh();
        Snapshot snap3 = table.currentSnapshot();
        ((AbstractStringAssert)Assertions.assertThat((String)snap3.operation()).as("Operation must match", new Object[0])).isEqualTo("delete");
        this.assertEquals("Rows should match", (List)ImmutableList.of((Object)this.row(new Object[]{1, "a", "DELETE", 0, snap3.snapshotId()})), this.changelogRecords(snap2, snap3));
    }

    @TestTemplate
    public void testExistingEntriesInNewDataManifestsAreIgnored() {
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg PARTITIONED BY (data) TBLPROPERTIES (  '%s' = '%d',  '%s' = '1',  '%s' = 'true' )", new Object[]{this.tableName, "format-version", this.formatVersion, "commit.manifest.min-count-to-merge", "commit.manifest-merge.enabled"});
        this.sql("INSERT INTO %s VALUES (1, 'a')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot snap1 = table.currentSnapshot();
        this.sql("INSERT INTO %s VALUES (2, 'b')", new Object[]{this.tableName});
        table.refresh();
        Snapshot snap2 = table.currentSnapshot();
        ((ListAssert)Assertions.assertThat((List)snap2.dataManifests(table.io())).as("Manifest number must match", new Object[0])).hasSize(1);
        this.assertEquals("Rows should match", (List)ImmutableList.of((Object)this.row(new Object[]{2, "b", "INSERT", 0, snap2.snapshotId()})), this.changelogRecords(snap1, snap2));
    }

    @TestTemplate
    public void testManifestRewritesAreIgnored() {
        this.createTableWithDefaultRows();
        this.sql("CALL %s.system.rewrite_manifests('%s')", new Object[]{this.catalogName, this.tableIdent});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        ((IterableAssert)Assertions.assertThat((Iterable)table.snapshots()).as("Num snapshots must match", new Object[0])).hasSize(3);
        this.assertEquals("Should have expected rows", (List)ImmutableList.of((Object)this.row(new Object[]{1, "INSERT"}), (Object)this.row(new Object[]{2, "INSERT"})), this.sql("SELECT id, _change_type FROM %s.changes ORDER BY id", new Object[]{this.tableName}));
    }

    @TestTemplate
    public void testMetadataColumns() {
        this.createTableWithDefaultRows();
        List rows = this.sql("SELECT id, _file, _pos, _deleted, _spec_id, _partition FROM %s.changes ORDER BY id", new Object[]{this.tableName});
        String file1 = ((Object[])rows.get(0))[1].toString();
        Assertions.assertThat((String)file1).startsWith((CharSequence)"file:/");
        String file2 = ((Object[])rows.get(1))[1].toString();
        this.assertEquals("Rows should match", (List)ImmutableList.of((Object)this.row(new Object[]{1, file1, 0L, false, 0, this.row(new Object[]{"a"})}), (Object)this.row(new Object[]{2, file2, 0L, false, 0, this.row(new Object[]{"b"})})), rows);
    }

    @TestTemplate
    public void testQueryWithRollback() {
        this.createTable();
        this.sql("INSERT INTO %s VALUES (1, 'a')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot snap1 = table.currentSnapshot();
        long rightAfterSnap1 = this.waitUntilAfter(snap1.timestampMillis());
        this.sql("INSERT INTO %s VALUES (2, 'b')", new Object[]{this.tableName});
        table.refresh();
        Snapshot snap2 = table.currentSnapshot();
        long rightAfterSnap2 = this.waitUntilAfter(snap2.timestampMillis());
        this.sql("CALL %s.system.rollback_to_snapshot('%s', %d)", new Object[]{this.catalogName, this.tableIdent, snap1.snapshotId()});
        table.refresh();
        Assertions.assertThat((Object)table.currentSnapshot()).isEqualTo((Object)snap1);
        this.sql("INSERT OVERWRITE %s VALUES (-2, 'a')", new Object[]{this.tableName});
        table.refresh();
        Snapshot snap3 = table.currentSnapshot();
        long rightAfterSnap3 = this.waitUntilAfter(snap3.timestampMillis());
        this.assertEquals("Should have expected changed rows up to snapshot 3", (List)ImmutableList.of((Object)this.row(new Object[]{1, "a", "INSERT", 0, snap1.snapshotId()}), (Object)this.row(new Object[]{1, "a", "DELETE", 1, snap3.snapshotId()}), (Object)this.row(new Object[]{-2, "a", "INSERT", 1, snap3.snapshotId()})), this.changelogRecords(null, rightAfterSnap3));
        this.assertEquals("Should have expected changed rows up to snapshot 2", (List)ImmutableList.of((Object)this.row(new Object[]{1, "a", "INSERT", 0, snap1.snapshotId()})), this.changelogRecords(null, rightAfterSnap2));
        this.assertEquals("Should have expected changed rows from snapshot 3 only since snapshot 2 is on a different branch.", (List)ImmutableList.of((Object)this.row(new Object[]{1, "a", "DELETE", 0, snap3.snapshotId()}), (Object)this.row(new Object[]{-2, "a", "INSERT", 0, snap3.snapshotId()})), this.changelogRecords(rightAfterSnap1, snap3.timestampMillis()));
        this.assertEquals("Should have expected changed rows from snapshot 3", (List)ImmutableList.of((Object)this.row(new Object[]{1, "a", "DELETE", 0, snap3.snapshotId()}), (Object)this.row(new Object[]{-2, "a", "INSERT", 0, snap3.snapshotId()})), this.changelogRecords(rightAfterSnap2, null));
        this.sql("CALL %s.system.set_current_snapshot('%s', %d)", new Object[]{this.catalogName, this.tableIdent, snap2.snapshotId()});
        table.refresh();
        Assertions.assertThat((Object)table.currentSnapshot()).isEqualTo((Object)snap2);
        this.assertEquals("Should have expected changed rows from snapshot 2 only since snapshot 3 is on a different branch.", (List)ImmutableList.of((Object)this.row(new Object[]{2, "b", "INSERT", 0, snap2.snapshotId()})), this.changelogRecords(rightAfterSnap1, null));
    }

    private void createTableWithDefaultRows() {
        this.createTable();
        this.insertDefaultRows();
    }

    private void createTable() {
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg PARTITIONED BY (data) TBLPROPERTIES (  '%s' = '%d' )", new Object[]{this.tableName, "format-version", this.formatVersion});
    }

    private void insertDefaultRows() {
        this.sql("INSERT INTO %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.sql("INSERT INTO %s VALUES (2, 'b')", new Object[]{this.tableName});
    }

    private List<Object[]> changelogRecords(Snapshot startSnapshot, Snapshot endSnapshot) {
        DataFrameReader reader = spark.read();
        if (startSnapshot != null) {
            reader = reader.option("start-snapshot-id", startSnapshot.snapshotId());
        }
        if (endSnapshot != null) {
            reader = reader.option("end-snapshot-id", endSnapshot.snapshotId());
        }
        return this.rowsToJava(this.collect(reader));
    }

    private List<Object[]> changelogRecords(Long startTimestamp, Long endTimeStamp) {
        DataFrameReader reader = spark.read();
        if (startTimestamp != null) {
            reader = reader.option("start-timestamp", startTimestamp.longValue());
        }
        if (endTimeStamp != null) {
            reader = reader.option("end-timestamp", endTimeStamp.longValue());
        }
        return this.rowsToJava(this.collect(reader));
    }

    private List<Row> collect(DataFrameReader reader) {
        return reader.table(this.tableName + ".changes").orderBy("_change_ordinal", new String[]{"_commit_snapshot_id", "_change_type", "id"}).collectAsList();
    }

    @TestTemplate
    public void testChangelogViewOutsideTimeRange() {
        this.createTableWithDefaultRows();
        this.sql("INSERT INTO %s VALUES (3, 'c')", new Object[]{this.tableName});
        this.sql("INSERT INTO %s VALUES (4, 'd')", new Object[]{this.tableName});
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot insertSnapshot = table.currentSnapshot();
        long beforeWindowTime = System.currentTimeMillis();
        try {
            Thread.sleep(100L);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Test interrupted", e);
        }
        long startTime = System.currentTimeMillis();
        long endTime = startTime + 1000L;
        this.sql("CALL %s.system.create_changelog_view(  table => '%s',   options => map(    'start-timestamp', '%d',    'end-timestamp', '%d'  ),  changelog_view => 'test_changelog_view')", new Object[]{this.catalogName, this.tableName, startTime, endTime});
        List results = this.sql("SELECT * FROM test_changelog_view WHERE _change_type IN ('INSERT', 'DELETE') ORDER BY _change_ordinal", new Object[0]);
        ((ListAssert)Assertions.assertThat((List)results).as("Num records must be zero", new Object[0])).isEmpty();
        this.sql("DROP VIEW IF EXISTS test_changelog_view", new Object[0]);
    }
}

