/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.actions;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.spark.actions.RewriteTablePathSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.SparkEnv;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockInfoManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BroadcastBlockId;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import scala.Tuple2;

public class TestRewriteTablePathsAction
extends TestBase {
    @TempDir
    private Path staging;
    @TempDir
    private Path tableDir;
    @TempDir
    private Path newTableDir;
    @TempDir
    private Path targetTableDir;
    private static final HadoopTables TABLES = new HadoopTables(new Configuration());
    protected static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"c1", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"c2", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"c3", (Type)Types.StringType.get())});
    protected String tableLocation = null;
    private Table table = null;
    private final String ns = "testns";
    private final String backupNs = "backupns";

    protected ActionsProvider actions() {
        return SparkActions.get();
    }

    @BeforeEach
    public void setupTableLocation() {
        this.tableLocation = this.tableDir.toFile().toURI().toString();
        this.table = this.createATableWith2Snapshots(this.tableLocation);
        this.createNameSpaces();
    }

    @AfterEach
    public void cleanupTableSetup() {
        this.dropNameSpaces();
    }

    private Table createATableWith2Snapshots(String location) {
        return this.createTableWithSnapshots(location, 2);
    }

    private Table createTableWithSnapshots(String location, int snapshotNumber) {
        return this.createTableWithSnapshots(location, snapshotNumber, Maps.newHashMap());
    }

    protected Table createTableWithSnapshots(String location, int snapshotNumber, Map<String, String> properties) {
        return this.createTableWithSnapshots(location, snapshotNumber, properties, "append");
    }

    private Table createTableWithSnapshots(String location, int snapshotNumber, Map<String, String> properties, String mode) {
        Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, location);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        for (int i = 0; i < snapshotNumber; ++i) {
            df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode(mode).save(location);
        }
        return newTable;
    }

    private void createNameSpaces() {
        this.sql("CREATE DATABASE IF NOT EXISTS %s", "testns");
        this.sql("CREATE DATABASE IF NOT EXISTS %s", "backupns");
    }

    private void dropNameSpaces() {
        this.sql("DROP DATABASE IF EXISTS %s CASCADE", "testns");
        this.sql("DROP DATABASE IF EXISTS %s CASCADE", "backupns");
    }

    @Test
    public void testRewritePath() throws Exception {
        String targetTableLocation = this.targetTableLocation();
        List validDataFiles = spark.read().format("iceberg").load(this.tableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        Assertions.assertThat((int)validDataFiles.size()).isEqualTo(2);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.tableLocation, targetTableLocation).endVersion("v3.metadata.json").execute();
        Assertions.assertThat((String)result.latestVersion()).isEqualTo("v3.metadata.json");
        this.checkFileNum(3, 2, 2, 9, result);
        this.copyTableFiles(result);
        List validDataFilesAfterRebuilt = spark.read().format("iceberg").load(targetTableLocation + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        ((ListAssert)Assertions.assertThat((List)validDataFilesAfterRebuilt).hasSize(2)).allMatch(item -> item.startsWith(targetTableLocation));
        List<Object[]> actual = this.rows(targetTableLocation);
        List<Object[]> expected = this.rows(this.tableLocation);
        this.assertEquals("Rows should match after copy", expected, actual);
    }

    @Test
    public void testSameLocations() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.tableLocation, this.tableLocation).endVersion("v1.metadata.json").execute()).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Source prefix cannot be the same as target prefix");
    }

    @Test
    public void testStartVersion() throws Exception {
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.tableLocation, this.targetTableLocation()).startVersion("v2.metadata.json").execute();
        this.checkFileNum(1, 1, 1, 4, result);
        List<Tuple2<String, String>> paths = this.readPathPairList(result.fileListLocation());
        String currentSnapshotId = String.valueOf(this.table.currentSnapshot().snapshotId());
        ((AbstractLongAssert)Assertions.assertThat((long)paths.stream().filter(c -> ((String)c._2()).contains(currentSnapshotId)).count()).withFailMessage("Should have the current snapshot file", new Object[0])).isEqualTo(1L);
        String parentSnapshotId = String.valueOf(this.table.currentSnapshot().parentId());
        ((AbstractLongAssert)Assertions.assertThat((long)paths.stream().filter(c -> ((String)c._2()).contains(parentSnapshotId)).count()).withFailMessage("Should NOT have the parent snapshot file", new Object[0])).isEqualTo(0L);
    }

    @Test
    public void testIncrementalRewrite() throws Exception {
        String location = this.newTableLocation();
        Table sourceTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), (Map)Maps.newHashMap(), location);
        ArrayList recordsA = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset dfA = spark.createDataFrame((List)recordsA, ThreeColumnRecord.class).coalesce(1);
        dfA.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(location);
        Assertions.assertThat((long)spark.read().format("iceberg").load(location).count()).isEqualTo(1L);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(sourceTable.location(), this.targetTableLocation()).execute();
        this.copyTableFiles(result);
        Assertions.assertThat((long)spark.read().format("iceberg").load(this.targetTableLocation()).count()).isEqualTo(1L);
        ArrayList recordsB = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(2, "BBBBBBBBB", "BBB")});
        Dataset dfB = spark.createDataFrame((List)recordsB, ThreeColumnRecord.class).coalesce(1);
        dfB.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(location);
        Assertions.assertThat((long)spark.read().format("iceberg").load(location).count()).isEqualTo(2L);
        sourceTable.refresh();
        Table targetTable = TABLES.load(this.targetTableLocation());
        String targetTableMetadata = this.currentMetadata(targetTable).metadataFileLocation();
        String startVersion = TestRewriteTablePathsAction.fileName(targetTableMetadata);
        RewriteTablePath.Result incrementalRewriteResult = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(sourceTable.location(), this.targetTableLocation()).startVersion(startVersion).execute();
        this.copyTableFiles(incrementalRewriteResult);
        List<Object[]> actual = this.rowsSorted(this.targetTableLocation(), "c1");
        List<Object[]> expected = this.rowsSorted(location, "c1");
        this.assertEquals("Rows should match after copy", expected, actual);
    }

    @Test
    public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path location2) throws Exception {
        String location = this.newTableLocation();
        Table tableWith3Snaps = this.createTableWithSnapshots(location, 3);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(tableWith3Snaps).rewriteLocationPrefix(location, this.toAbsolute(location1)).startVersion("v2.metadata.json").execute();
        this.checkFileNum(2, 2, 2, 8, result);
        RewriteTablePath.Result result1 = (RewriteTablePath.Result)this.actions().rewriteTablePath(tableWith3Snaps).rewriteLocationPrefix(location, this.toAbsolute(location2)).startVersion("v1.metadata.json").execute();
        this.checkFileNum(3, 3, 3, 12, result1);
    }

    @Test
    public void testFullTableRewritePath() throws Exception {
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.tableLocation, this.targetTableLocation()).execute();
        this.checkFileNum(3, 2, 2, 9, result);
    }

    @Test
    public void testDeleteDataFile() throws Exception {
        List validDataFiles = spark.read().format("iceberg").load(this.table.location() + "#files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        this.table.newDelete().deleteFile((CharSequence)validDataFiles.stream().findFirst().get()).commit();
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), this.targetTableLocation()).stagingLocation(this.stagingLocation()).execute();
        this.checkFileNum(4, 3, 3, 12, result);
        this.copyTableFiles(result);
        Dataset resultDF = spark.read().format("iceberg").load(this.targetTableLocation());
        ((AbstractLongAssert)Assertions.assertThat((long)resultDF.as(Encoders.bean(ThreeColumnRecord.class)).count()).withFailMessage("There are only one row left since we deleted a data file", new Object[0])).isEqualTo(1L);
    }

    @Test
    public void testPositionDeletes() throws Exception {
        ArrayList deletes = Lists.newArrayList((Object[])new Pair[]{Pair.of((Object)((DataFile)this.table.currentSnapshot().addedDataFiles(this.table.io()).iterator().next()).location(), (Object)0L)});
        File file = new File(this.removePrefix(this.table.location() + "/data/deeply/nested/deletes.parquet"));
        DeleteFile positionDeletes = (DeleteFile)FileHelpers.writeDeleteFile((Table)this.table, (OutputFile)this.table.io().newOutputFile(file.toURI().toString()), (List)deletes).first();
        this.table.newRowDelta().addDeletes(positionDeletes).commit();
        Assertions.assertThat((long)spark.read().format("iceberg").load(this.table.location()).count()).isEqualTo(1L);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).stagingLocation(this.stagingLocation()).rewriteLocationPrefix(this.table.location(), this.targetTableLocation()).execute();
        this.checkFileNum(4, 3, 3, 13, result);
        this.copyTableFiles(result);
        Assertions.assertThat((long)spark.read().format("iceberg").load(this.targetTableLocation()).count()).isEqualTo(1L);
    }

    @Test
    public void testPositionDeleteWithRow() throws Exception {
        String dataFileLocation = ((DataFile)this.table.currentSnapshot().addedDataFiles(this.table.io()).iterator().next()).location();
        ArrayList deletes = Lists.newArrayList();
        OutputFile deleteFile = this.table.io().newOutputFile(new File(this.removePrefix(this.table.location() + "/data/deeply/nested/deletes.parquet")).toURI().toString());
        deletes.add(this.positionDelete(SCHEMA, dataFileLocation, 0L, 1, "AAAAAAAAAA", "AAAA"));
        DeleteFile positionDeletes = FileHelpers.writePosDeleteFile((Table)this.table, (OutputFile)deleteFile, null, (List)deletes);
        this.table.newRowDelta().addDeletes(positionDeletes).commit();
        Assertions.assertThat((long)spark.read().format("iceberg").load(this.table.location()).count()).isEqualTo(1L);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).stagingLocation(this.stagingLocation()).rewriteLocationPrefix(this.table.location(), this.targetTableLocation()).execute();
        this.checkFileNum(4, 3, 3, 13, result);
        this.copyTableFiles(result);
        Object[] deletedRow = (Object[])this.rows(this.targetTableLocation() + "#position_deletes").get(0)[2];
        this.assertEquals("Position deletes should be equal", new Object[]{1, "AAAAAAAAAA", "AAAA"}, deletedRow);
        Assertions.assertThat((long)spark.read().format("iceberg").load(this.targetTableLocation()).count()).isEqualTo(1L);
    }

    @Test
    public void testPositionDeletesAcrossFiles() throws Exception {
        Stream<Pair> allFiles = StreamSupport.stream(this.table.snapshots().spliterator(), false).flatMap(s -> StreamSupport.stream(s.addedDataFiles(this.table.io()).spliterator(), false));
        List deletes = allFiles.map(f -> Pair.of((Object)f.location(), (Object)0L)).collect(Collectors.toList());
        Assertions.assertThat((int)deletes.size()).isEqualTo(2);
        File file = new File(this.removePrefix(this.table.location() + "/data/deeply/nested/file.parquet"));
        DeleteFile positionDeletes = (DeleteFile)FileHelpers.writeDeleteFile((Table)this.table, (OutputFile)this.table.io().newOutputFile(file.toURI().toString()), deletes).first();
        this.table.newRowDelta().addDeletes(positionDeletes).commit();
        Assertions.assertThat((long)spark.read().format("iceberg").load(this.table.location()).count()).isEqualTo(0L);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).stagingLocation(this.stagingLocation()).rewriteLocationPrefix(this.table.location(), this.targetTableLocation()).execute();
        this.checkFileNum(4, 3, 3, 13, result);
        this.copyTableFiles(result);
        Assertions.assertThat((long)spark.read().format("iceberg").load(this.targetTableLocation()).count()).isEqualTo(0L);
    }

    @Test
    public void testEqualityDeletes() throws Exception {
        Table sourceTable = this.createTableWithSnapshots(this.newTableLocation(), 1);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"), new ThreeColumnRecord(3, "BBBBBBBBBB", "BBBB"), new ThreeColumnRecord(4, "CCCCCCCCCC", "CCCC"), new ThreeColumnRecord(5, "DDDDDDDDDD", "DDDD")});
        spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1).select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.newTableLocation());
        Schema deleteRowSchema = sourceTable.schema().select(new String[]{"c2"});
        GenericRecord dataDelete = GenericRecord.create((Schema)deleteRowSchema);
        ArrayList dataDeletes = Lists.newArrayList((Object[])new Record[]{dataDelete.copy("c2", (Object)"AAAAAAAAAA"), dataDelete.copy("c2", (Object)"CCCCCCCCCC")});
        File file = new File(this.removePrefix(sourceTable.location()) + "/data/deeply/nested/file.parquet");
        DeleteFile equalityDeletes = FileHelpers.writeDeleteFile((Table)sourceTable, (OutputFile)sourceTable.io().newOutputFile(file.toURI().toString()), (StructLike)TestHelpers.Row.of((Object[])new Object[]{0}), (List)dataDeletes, (Schema)deleteRowSchema);
        sourceTable.newRowDelta().addDeletes(equalityDeletes).commit();
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).stagingLocation(this.stagingLocation()).rewriteLocationPrefix(this.newTableLocation(), this.targetTableLocation()).execute();
        this.checkFileNum(4, 3, 3, 13, result);
        this.copyTableFiles(result);
        Assertions.assertThat((long)spark.read().format("iceberg").load(this.targetTableLocation()).count()).isEqualTo(2L);
    }

    @Test
    public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception {
        String location = this.newTableLocation();
        Table sourceTable = this.createTableWithSnapshots(location, 2);
        Table staticTable = this.newStaticTable(location + "metadata/v2.metadata.json", this.table.io());
        int expiredManifestListCount = 1;
        ExpireSnapshots.Result expireResult = (ExpireSnapshots.Result)this.actions().expireSnapshots(sourceTable).expireSnapshotId(staticTable.currentSnapshot().snapshotId()).execute();
        Assertions.assertThat((long)expireResult.deletedManifestListsCount()).isEqualTo((long)expiredManifestListCount);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        for (int i = 0; i < 100; ++i) {
            df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(location);
        }
        sourceTable.refresh();
        int totalIteration = 102;
        int missingVersionFile = 1;
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).stagingLocation(this.stagingLocation()).rewriteLocationPrefix(location, this.targetTableLocation()).execute();
        this.checkFileNum(totalIteration - missingVersionFile, totalIteration - expiredManifestListCount, totalIteration, totalIteration * 4 - missingVersionFile - expiredManifestListCount, result);
    }

    @Test
    public void testRewritePathWithoutSnapshot() throws Exception {
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.tableLocation, this.newTableLocation()).endVersion("v1.metadata.json").execute();
        this.checkFileNum(1, 0, 0, 1, result);
    }

    @Test
    public void testExpireSnapshotBeforeRewrite() throws Exception {
        this.actions().expireSnapshots(this.table).expireSnapshotId(this.table.currentSnapshot().parentId().longValue()).execute();
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).stagingLocation(this.stagingLocation()).rewriteLocationPrefix(this.table.location(), this.targetTableLocation()).execute();
        this.checkFileNum(4, 1, 2, 9, result);
    }

    @Test
    public void testRewritePathWithNonLiveEntry() throws Exception {
        String location = this.newTableLocation();
        Table tableWith3Snaps = this.createTableWithSnapshots(location, 3, Maps.newHashMap(), "overwrite");
        Snapshot oldest = SnapshotUtil.oldestAncestor((Table)tableWith3Snaps);
        String oldestDataFilePath = ((DataFile)Iterables.getOnlyElement((Iterable)tableWith3Snaps.snapshot(oldest.snapshotId()).addedDataFiles(tableWith3Snaps.io()))).location();
        String deletedDataFilePathInTargetLocation = String.format("%sdata/%s", this.targetTableLocation(), TestRewriteTablePathsAction.fileName(oldestDataFilePath));
        ExpireSnapshots.Result expireResult = (ExpireSnapshots.Result)this.actions().expireSnapshots(tableWith3Snaps).expireSnapshotId(oldest.snapshotId()).execute();
        ((ObjectAssert)Assertions.assertThat((Object)expireResult).as("Should deleted 1 data files in root snapshot", new Object[0])).extracting(new Function[]{ExpireSnapshots.Result::deletedManifestListsCount, ExpireSnapshots.Result::deletedManifestsCount, ExpireSnapshots.Result::deletedDataFilesCount}).contains(new Object[]{1L, 1L, 1L});
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(tableWith3Snaps).stagingLocation(this.stagingLocation()).rewriteLocationPrefix(tableWith3Snaps.location(), this.targetTableLocation()).execute();
        this.checkFileNum(5, 2, 4, 13, result);
        this.copyTableFiles(result);
        List copiedDataFiles = spark.read().format("iceberg").load(this.targetTableLocation() + "#all_files").select("file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        ((ListAssert)Assertions.assertThat((List)copiedDataFiles).hasSize(2)).doesNotContain((Object[])new String[]{deletedDataFilePathInTargetLocation});
        List copiedEntries = spark.read().format("iceberg").load(this.targetTableLocation() + "#all_entries").filter("status == 2").select("data_file.file_path", new String[0]).as(Encoders.STRING()).collectAsList();
        Assertions.assertThat((List)copiedEntries).contains((Object[])new String[]{deletedDataFilePathInTargetLocation});
    }

    @Test
    public void testStartSnapshotWithoutValidSnapshot() throws Exception {
        this.actions().expireSnapshots(this.table).expireSnapshotId(this.table.currentSnapshot().parentId().longValue()).execute();
        Assertions.assertThat((Iterable)this.table.snapshots()).hasSize(1);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), this.targetTableLocation()).stagingLocation(this.stagingLocation()).startVersion("v2.metadata.json").execute();
        this.checkFileNum(2, 1, 1, 5, result);
    }

    @Test
    public void testMoveTheVersionExpireSnapshot() throws Exception {
        this.actions().expireSnapshots(this.table).expireSnapshotId(this.table.currentSnapshot().parentId().longValue()).execute();
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), this.targetTableLocation()).stagingLocation(this.stagingLocation()).startVersion("v3.metadata.json").execute();
        this.checkFileNum(1, 0, 0, 1, result);
    }

    @Test
    public void testMoveVersionWithInvalidSnapshots() {
        this.actions().expireSnapshots(this.table).expireSnapshotId(this.table.currentSnapshot().parentId().longValue()).execute();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), this.newTableLocation()).stagingLocation(this.stagingLocation()).endVersion("v3.metadata.json").execute()).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. Please choose an earlier version without invalid snapshots.");
    }

    @Test
    public void testRollBack() throws Exception {
        long secondSnapshotId = this.table.currentSnapshot().snapshotId();
        this.table.manageSnapshots().setCurrentSnapshot(this.table.currentSnapshot().parentId().longValue()).commit();
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.table.location());
        this.table.refresh();
        this.table.manageSnapshots().setCurrentSnapshot(secondSnapshotId).commit();
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), this.newTableLocation()).stagingLocation(this.stagingLocation()).execute();
        this.checkFileNum(6, 3, 3, 15, result);
    }

    @Test
    public void testWriteAuditPublish() throws Exception {
        this.table.updateProperties().set("write.wap.enabled", "true").commit();
        spark.conf().set("spark.wap.id", "1");
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").save(this.table.location());
        this.table.refresh();
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), this.newTableLocation()).stagingLocation(this.stagingLocation()).execute();
        this.checkFileNum(5, 3, 3, 14, result);
    }

    @Test
    public void testSchemaChange() throws Exception {
        this.table.updateSchema().addColumn("c4", (Type)Types.StringType.get()).commit();
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(this.table).rewriteLocationPrefix(this.table.location(), this.newTableLocation()).stagingLocation(this.stagingLocation()).execute();
        this.checkFileNum(4, 2, 2, 10, result);
    }

    @Test
    public void testSnapshotIdInheritanceEnabled() throws Exception {
        String sourceTableLocation = this.newTableLocation();
        HashMap properties = Maps.newHashMap();
        properties.put("compatibility.snapshot-id-inheritance.enabled", "true");
        Table sourceTable = this.createTableWithSnapshots(sourceTableLocation, 2, properties);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).stagingLocation(this.stagingLocation()).rewriteLocationPrefix(sourceTableLocation, this.targetTableLocation()).execute();
        this.checkFileNum(3, 2, 2, 9, result);
    }

    @Test
    public void testMetadataCompression() throws Exception {
        String sourceTableLocation = this.newTableLocation();
        HashMap properties = Maps.newHashMap();
        properties.put("write.metadata.compression-codec", "gzip");
        Table sourceTable = this.createTableWithSnapshots(sourceTableLocation, 2, properties);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(sourceTableLocation, this.targetTableLocation()).endVersion("v2.gz.metadata.json").execute();
        this.checkFileNum(2, 1, 1, 5, result);
        result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(sourceTableLocation, this.targetTableLocation()).startVersion("v1.gz.metadata.json").execute();
        this.checkFileNum(2, 2, 2, 8, result);
    }

    @Test
    public void testInvalidArgs() {
        RewriteTablePath actions = this.actions().rewriteTablePath(this.table);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> actions.rewriteLocationPrefix("", null)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Source prefix('') cannot be empty");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> actions.rewriteLocationPrefix(null, null)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Source prefix('null') cannot be empty");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> actions.stagingLocation("")).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Staging location('') cannot be empty");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> actions.stagingLocation(null)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Staging location('null') cannot be empty");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> actions.startVersion(null)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Start version('null') cannot be empty");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> actions.endVersion(" ")).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("End version(' ') cannot be empty");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> actions.endVersion(null)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("End version('null') cannot be empty");
    }

    @Test
    public void testPartitionStatisticFile() throws IOException {
        String sourceTableLocation = this.newTableLocation();
        HashMap properties = Maps.newHashMap();
        properties.put("format-version", "2");
        String tableName = "v2tblwithPartStats";
        Table sourceTable = this.createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0);
        TableMetadata metadata = this.currentMetadata(sourceTable);
        TableMetadata withPartStatistics = TableMetadata.buildFrom((TableMetadata)metadata).setPartitionStatistics((PartitionStatisticsFile)ImmutableGenericPartitionStatisticsFile.builder().snapshotId(11L).path("/some/partition/stats/file.parquet").fileSizeInBytes(42L).build()).build();
        OutputFile file = sourceTable.io().newOutputFile(metadata.metadataFileLocation());
        TableMetadataParser.overwrite((TableMetadata)withPartStatistics, (OutputFile)file);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(sourceTableLocation, this.targetTableLocation()).execute()).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Partition statistics files are not supported yet");
    }

    @Test
    public void testTableWithManyStatisticFiles() throws IOException {
        String sourceTableLocation = this.newTableLocation();
        HashMap properties = Maps.newHashMap();
        properties.put("format-version", "2");
        String tableName = "v2tblwithmanystats";
        Table sourceTable = this.createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0);
        int iterations = 10;
        for (int i = 0; i < iterations; ++i) {
            this.sql("insert into hive.default.%s values (%s, 'AAAAAAAAAA', 'AAAA')", tableName, i);
            sourceTable.refresh();
            this.actions().computeTableStats(sourceTable).execute();
        }
        sourceTable.refresh();
        Assertions.assertThat((int)sourceTable.statisticsFiles().size()).isEqualTo(iterations);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(sourceTableLocation, this.targetTableLocation()).execute();
        this.checkFileNum(iterations * 2 + 1, iterations, iterations, iterations, iterations * 6 + 1, result);
    }

    @Test
    public void testMetadataCompressionWithMetastoreTable() throws Exception {
        HashMap properties = Maps.newHashMap();
        properties.put("write.metadata.compression-codec", "gzip");
        Table sourceTable = this.createMetastoreTable(this.newTableLocation(), properties, "default", "testMetadataCompression", 2);
        TableMetadata currentMetadata = this.currentMetadata(sourceTable);
        String endVersion = TestRewriteTablePathsAction.fileName(((TableMetadata.MetadataLogEntry)currentMetadata.previousFiles().get(1)).file());
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(this.newTableLocation(), this.targetTableLocation()).endVersion(endVersion).execute();
        this.checkFileNum(2, 1, 1, 5, result);
        String firstVersion = TestRewriteTablePathsAction.fileName(((TableMetadata.MetadataLogEntry)currentMetadata.previousFiles().get(0)).file());
        result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(this.newTableLocation(), this.targetTableLocation()).startVersion(firstVersion).execute();
        this.checkFileNum(2, 2, 2, 8, result);
    }

    @Test
    public void testMetadataLocationChange() throws Exception {
        Table sourceTable = this.createMetastoreTable(this.newTableLocation(), Maps.newHashMap(), "default", "tbl", 1);
        String metadataFilePath = this.currentMetadata(sourceTable).metadataFileLocation();
        String newMetadataDir = "new-metadata-dir";
        sourceTable.updateProperties().set("write.metadata.path", this.newTableLocation() + newMetadataDir).commit();
        spark.sql("insert into hive.default.tbl values (1, 'AAAAAAAAAA', 'AAAA')");
        sourceTable.refresh();
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(this.newTableLocation(), this.targetTableLocation()).execute();
        this.checkFileNum(4, 2, 2, 10, result);
        RewriteTablePath.Result result1 = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(this.newTableLocation(), this.targetTableLocation()).endVersion(TestRewriteTablePathsAction.fileName(metadataFilePath)).execute();
        this.checkFileNum(2, 1, 1, 5, result1);
        RewriteTablePath.Result result2 = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(this.newTableLocation(), this.targetTableLocation()).startVersion(TestRewriteTablePathsAction.fileName(metadataFilePath)).execute();
        this.checkFileNum(2, 1, 1, 5, result2);
    }

    @Test
    public void testDeleteFrom() throws Exception {
        HashMap properties = Maps.newHashMap();
        properties.put("format-version", "2");
        properties.put("write.delete.mode", "merge-on-read");
        String tableName = "v2tbl";
        Table sourceTable = this.createMetastoreTable(this.newTableLocation(), properties, "default", tableName, 0);
        ArrayList records = Lists.newArrayList((Object[])new ThreeColumnRecord[]{new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"), new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"), new ThreeColumnRecord(3, "AAAAAAAAAA", "AAAA")});
        Dataset df = spark.createDataFrame((List)records, ThreeColumnRecord.class).coalesce(1);
        df.select("c1", new String[]{"c2", "c3"}).write().format("iceberg").mode("append").saveAsTable("hive.default." + tableName);
        sourceTable.refresh();
        spark.sql(String.format("delete from hive.default.%s where c1 = 1", tableName));
        sourceTable.refresh();
        List<Object[]> originalData = this.rowsToJava(spark.read().format("iceberg").load("hive.default." + tableName).sort("c1", new String[]{"c2", "c3"}).collectAsList());
        Assertions.assertThat((int)originalData.size()).isEqualTo(2);
        RewriteTablePath.Result result = (RewriteTablePath.Result)this.actions().rewriteTablePath(sourceTable).rewriteLocationPrefix(this.newTableLocation(), this.targetTableLocation()).execute();
        this.checkFileNum(3, 2, 2, 9, result);
        this.copyTableFiles(result);
        String metadataLocation = this.currentMetadata(sourceTable).metadataFileLocation();
        String versionFile = TestRewriteTablePathsAction.fileName(metadataLocation);
        String targetTableName = "copiedV2Table";
        TableIdentifier tableIdentifier = TableIdentifier.of((String[])new String[]{"default", targetTableName});
        catalog.registerTable(tableIdentifier, this.targetTableLocation() + "/metadata/" + versionFile);
        List<Object[]> copiedData = this.rowsToJava(spark.read().format("iceberg").load("hive.default." + targetTableName).sort("c1", new String[]{"c2", "c3"}).collectAsList());
        this.assertEquals("Rows must match", originalData, copiedData);
    }

    @Test
    public void testKryoDeserializeBroadcastValues() {
        sparkContext.getConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        RewriteTablePathSparkAction action = (RewriteTablePathSparkAction)this.actions().rewriteTablePath(this.table);
        Broadcast tableBroadcast = action.tableBroadcast();
        this.removeBroadcastValuesFromLocalBlockManager(tableBroadcast.id());
        Assertions.assertThat((Comparable)((Table)tableBroadcast.getValue()).uuid()).isEqualTo((Object)this.table.uuid());
    }

    protected void checkFileNum(int versionFileCount, int manifestListCount, int manifestFileCount, int totalCount, RewriteTablePath.Result result) {
        this.checkFileNum(versionFileCount, manifestListCount, manifestFileCount, 0, totalCount, result);
    }

    protected void checkFileNum(int versionFileCount, int manifestListCount, int manifestFileCount, int statisticsFileCount, int totalCount, RewriteTablePath.Result result) {
        List filesToMove = spark.read().format("text").load(result.fileListLocation()).as(Encoders.STRING()).collectAsList();
        Predicate<String> isManifest = f -> f.endsWith("-m0.avro") || f.endsWith("-m1.avro");
        Predicate<String> isManifestList = f -> f.contains("snap-") && f.endsWith(".avro");
        Predicate<String> isMetadataJSON = f -> f.endsWith(".metadata.json");
        ((AbstractLongAssert)Assertions.assertThat((long)filesToMove.stream().filter(isMetadataJSON).count()).as("Wrong rebuilt version file count", new Object[0])).isEqualTo((long)versionFileCount);
        ((AbstractLongAssert)Assertions.assertThat((long)filesToMove.stream().filter(isManifestList).count()).as("Wrong rebuilt Manifest list file count", new Object[0])).isEqualTo((long)manifestListCount);
        ((AbstractLongAssert)Assertions.assertThat((long)filesToMove.stream().filter(isManifest).count()).as("Wrong rebuilt Manifest file file count", new Object[0])).isEqualTo((long)manifestFileCount);
        ((AbstractLongAssert)Assertions.assertThat((long)filesToMove.stream().filter(f -> f.endsWith(".stats")).count()).withFailMessage("Wrong rebuilt Statistic file count", new Object[0])).isEqualTo((long)statisticsFileCount);
        ((AbstractIntegerAssert)Assertions.assertThat((int)filesToMove.size()).as("Wrong total file count", new Object[0])).isEqualTo(totalCount);
    }

    protected String newTableLocation() throws IOException {
        return this.toAbsolute(this.newTableDir);
    }

    protected String targetTableLocation() throws IOException {
        return this.toAbsolute(this.targetTableDir);
    }

    protected String stagingLocation() throws IOException {
        return this.toAbsolute(this.staging);
    }

    protected String toAbsolute(Path relative) throws IOException {
        return relative.toFile().toURI().toString();
    }

    private void copyTableFiles(RewriteTablePath.Result result) throws Exception {
        List<Tuple2<String, String>> filesToMove = this.readPathPairList(result.fileListLocation());
        for (Tuple2<String, String> pathPair : filesToMove) {
            FileUtils.copyFile((File)new File(URI.create((String)pathPair._1())), (File)new File(URI.create((String)pathPair._2())));
        }
    }

    private String removePrefix(String path) {
        return path.substring(path.lastIndexOf(":") + 1);
    }

    protected Table newStaticTable(String metadataFileLocation, FileIO io) {
        StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io);
        return new BaseTable((TableOperations)ops, metadataFileLocation);
    }

    private List<Tuple2<String, String>> readPathPairList(String path) {
        Encoder encoder = Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING());
        return spark.read().format("csv").schema(encoder.schema()).load(path).as(encoder).collectAsList();
    }

    private Table createMetastoreTable(String location, Map<String, String> properties, String namespace, String tableName, int snapshotNumber) {
        String sqlStr;
        spark.conf().set("spark.sql.catalog.hive", SparkCatalog.class.getName());
        spark.conf().set("spark.sql.catalog.hive.type", "hive");
        spark.conf().set("spark.sql.catalog.hive.default-namespace", "default");
        spark.conf().set("spark.sql.catalog.hive.cache-enabled", "false");
        StringBuilder propertiesStr = new StringBuilder();
        properties.forEach((k, v) -> propertiesStr.append("'" + k + "'='" + v + "',"));
        String tblProperties = propertiesStr.substring(0, propertiesStr.length() > 0 ? propertiesStr.length() - 1 : 0);
        this.sql("DROP TABLE IF EXISTS hive.%s.%s", namespace, tableName);
        if (tblProperties.isEmpty()) {
            sqlStr = String.format("CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", namespace, tableName);
            if (!location.isEmpty()) {
                sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, location);
            }
            this.sql(sqlStr, new Object[0]);
        } else {
            sqlStr = String.format("CREATE TABLE hive.%s.%s (c1 bigint, c2 string, c3 string)", namespace, tableName);
            if (!location.isEmpty()) {
                sqlStr = String.format("%s USING iceberg LOCATION '%s'", sqlStr, location);
            }
            sqlStr = String.format("%s TBLPROPERTIES (%s)", sqlStr, tblProperties);
            this.sql(sqlStr, new Object[0]);
        }
        for (int i = 0; i < snapshotNumber; ++i) {
            this.sql("insert into hive.%s.%s values (%s, 'AAAAAAAAAA', 'AAAA')", namespace, tableName, i);
        }
        return catalog.loadTable(TableIdentifier.of((String[])new String[]{namespace, tableName}));
    }

    private static String fileName(String path) {
        String filename = path;
        int lastIndex = path.lastIndexOf(File.separator);
        if (lastIndex != -1) {
            filename = path.substring(lastIndex + 1);
        }
        return filename;
    }

    private TableMetadata currentMetadata(Table tbl) {
        return ((HasTableOperations)tbl).operations().current();
    }

    private List<Object[]> rows(String location) {
        return this.rowsToJava(spark.read().format("iceberg").load(location).collectAsList());
    }

    private List<Object[]> rowsSorted(String location, String sortCol) {
        return this.rowsToJava(spark.read().format("iceberg").load(location).sort(sortCol, new String[0]).collectAsList());
    }

    private PositionDelete<GenericRecord> positionDelete(Schema tableSchema, CharSequence path, Long position, Object ... values) {
        PositionDelete posDelete = PositionDelete.create();
        GenericRecord nested = GenericRecord.create((Schema)tableSchema);
        for (int i = 0; i < values.length; ++i) {
            nested.set(i, values[i]);
        }
        posDelete.set(path, position.longValue(), (Object)nested);
        return posDelete;
    }

    private void removeBroadcastValuesFromLocalBlockManager(long id) {
        BroadcastBlockId blockId = new BroadcastBlockId(id, "");
        SparkEnv env = SparkEnv.get();
        env.broadcastManager().cachedValues().clear();
        BlockManager blockManager = env.blockManager();
        BlockInfoManager blockInfoManager = blockManager.blockInfoManager();
        blockInfoManager.lockForWriting((BlockId)blockId, true);
        blockInfoManager.removeBlock((BlockId)blockId);
        blockManager.memoryStore().remove((BlockId)blockId);
    }
}

