/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.util.DataFileSet;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;

public class TestFileRewriteCoordinator
extends CatalogTestBase {
    @AfterEach
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s", this.tableName);
    }

    @TestTemplate
    public void testBinPackRewrite() throws NoSuchTableException, IOException {
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        Dataset<Row> df = this.newDF(1000);
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        ((IterableAssert)Assertions.assertThat((Iterable)table.snapshots()).as("Should produce 4 snapshots", new Object[0])).hasSize(4);
        Dataset fileDF = spark.read().format("iceberg").load(this.tableName(this.tableIdent.name() + ".files"));
        List fileSizes = fileDF.select("file_size_in_bytes", new String[0]).as(Encoders.LONG()).collectAsList();
        long avgFileSize = fileSizes.stream().mapToLong(i -> i).sum() / (long)fileSizes.size();
        try (CloseableIterable fileScanTasks = table.newScan().planFiles();){
            String fileSetID = UUID.randomUUID().toString();
            ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
            taskSetManager.stageTasks(table, fileSetID, (List)Lists.newArrayList((Iterable)fileScanTasks));
            Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).option("split-size", Long.toString(avgFileSize * 2L)).option("file-open-cost", "0").load(this.tableName);
            scanDF.writeTo(this.tableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
            FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
            Set rewrittenFiles = (Set)taskSetManager.fetchTasks(table, fileSetID).stream().map(t -> (DataFile)t.asFileScanTask().file()).collect(Collectors.toCollection(DataFileSet::create));
            Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID);
            table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
        }
        table.refresh();
        Map summary = table.currentSnapshot().summary();
        ((AbstractStringAssert)Assertions.assertThat((String)((String)summary.get("deleted-data-files"))).as("Deleted files count must match", new Object[0])).isEqualTo("4");
        ((AbstractStringAssert)Assertions.assertThat((String)((String)summary.get("added-data-files"))).as("Added files count must match", new Object[0])).isEqualTo("2");
        Object rowCount = this.scalarSql("SELECT count(*) FROM %s", this.tableName);
        ((ObjectAssert)Assertions.assertThat((Object)rowCount).as("Row count must match", new Object[0])).isEqualTo((Object)4000L);
    }

    @TestTemplate
    public void testSortRewrite() throws NoSuchTableException, IOException {
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        Dataset<Row> df = this.newDF(1000);
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        ((IterableAssert)Assertions.assertThat((Iterable)table.snapshots()).as("Should produce 4 snapshots", new Object[0])).hasSize(4);
        try (CloseableIterable fileScanTasks = table.newScan().planFiles();){
            String fileSetID = UUID.randomUUID().toString();
            ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
            taskSetManager.stageTasks(table, fileSetID, (List)Lists.newArrayList((Iterable)fileScanTasks));
            Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID).option("split-size", "134217728").option("file-open-cost", "134217728").load(this.tableName);
            ImmutableMap sqlConf = ImmutableMap.of((Object)"spark.sql.shuffle.partitions", (Object)"2", (Object)"spark.sql.adaptive.enabled", (Object)"false");
            this.withSQLConf((Map<String, String>)sqlConf, () -> {
                try {
                    scanDF.sort("id", new String[0]).writeTo(this.tableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
                }
                catch (NoSuchTableException e) {
                    throw new RuntimeException("Could not replace files", e);
                }
            });
            FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
            Set rewrittenFiles = (Set)taskSetManager.fetchTasks(table, fileSetID).stream().map(t -> (DataFile)t.asFileScanTask().file()).collect(Collectors.toCollection(DataFileSet::create));
            Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID);
            table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
        }
        table.refresh();
        Map summary = table.currentSnapshot().summary();
        ((AbstractStringAssert)Assertions.assertThat((String)((String)summary.get("deleted-data-files"))).as("Deleted files count must match", new Object[0])).isEqualTo("4");
        ((AbstractStringAssert)Assertions.assertThat((String)((String)summary.get("added-data-files"))).as("Added files count must match", new Object[0])).isEqualTo("2");
        Object rowCount = this.scalarSql("SELECT count(*) FROM %s", this.tableName);
        ((ObjectAssert)Assertions.assertThat((Object)rowCount).as("Row count must match", new Object[0])).isEqualTo((Object)4000L);
    }

    @TestTemplate
    public void testCommitMultipleRewrites() throws NoSuchTableException, IOException {
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        Dataset<Row> df = this.newDF(1000);
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        String firstFileSetID = UUID.randomUUID().toString();
        long firstFileSetSnapshotId = table.currentSnapshot().snapshotId();
        ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
        try (CloseableIterable tasks = table.newScan().planFiles();){
            taskSetManager.stageTasks(table, firstFileSetID, (List)Lists.newArrayList((Iterable)tasks));
        }
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        table.refresh();
        String secondFileSetID = UUID.randomUUID().toString();
        try (CloseableIterable tasks = table.newScan().appendsAfter(firstFileSetSnapshotId).planFiles();){
            taskSetManager.stageTasks(table, secondFileSetID, (List)Lists.newArrayList((Iterable)tasks));
        }
        ImmutableSet fileSetIDs = ImmutableSet.of((Object)firstFileSetID, (Object)secondFileSetID);
        for (String fileSetID2 : fileSetIDs) {
            Dataset scanDF = spark.read().format("iceberg").option("scan-task-set-id", fileSetID2).option("split-size", Long.MAX_VALUE).load(this.tableName);
            scanDF.writeTo(this.tableName).option("rewritten-file-scan-task-set-id", fileSetID2).append();
        }
        FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
        Set rewrittenFiles = fileSetIDs.stream().flatMap(fileSetID -> taskSetManager.fetchTasks(table, fileSetID).stream()).map(t -> (DataFile)t.asFileScanTask().file()).collect(Collectors.toSet());
        Set addedFiles = (Set)fileSetIDs.stream().flatMap(fileSetID -> rewriteCoordinator.fetchNewFiles(table, fileSetID).stream()).collect(Collectors.toCollection(DataFileSet::create));
        table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
        table.refresh();
        ((IterableAssert)Assertions.assertThat((Iterable)table.snapshots()).as("Should produce 5 snapshots", new Object[0])).hasSize(5);
        Map summary = table.currentSnapshot().summary();
        ((AbstractStringAssert)Assertions.assertThat((String)((String)summary.get("deleted-data-files"))).as("Deleted files count must match", new Object[0])).isEqualTo("4");
        ((AbstractStringAssert)Assertions.assertThat((String)((String)summary.get("added-data-files"))).as("Added files count must match", new Object[0])).isEqualTo("2");
        Object rowCount = this.scalarSql("SELECT count(*) FROM %s", this.tableName);
        ((ObjectAssert)Assertions.assertThat((Object)rowCount).as("Row count must match", new Object[0])).isEqualTo((Object)4000L);
    }

    private Dataset<Row> newDF(int numRecords) {
        ArrayList data = Lists.newArrayListWithExpectedSize((int)numRecords);
        for (int index = 0; index < numRecords; ++index) {
            data.add(new SimpleRecord(index, Integer.toString(index)));
        }
        return spark.createDataFrame((List)data, SimpleRecord.class);
    }
}

