/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class TestFileRewriteCoordinator
extends SparkCatalogTestBase {
    public TestFileRewriteCoordinator(String catalogName, String implementation, Map<String, String> config) {
        super(catalogName, implementation, config);
    }

    @After
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s", this.tableName);
    }

    @Test
    public void testBinPackRewrite() throws NoSuchTableException, IOException {
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        Dataset<Row> df = this.newDF(1000);
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals((String)"Should produce 4 snapshots", (long)4L, (long)Iterables.size((Iterable)table.snapshots()));
        Dataset fileDF = spark.read().format("iceberg").load(this.tableName(this.tableIdent.name() + ".files"));
        List fileSizes = fileDF.select("file_size_in_bytes", new String[0]).as(Encoders.LONG()).collectAsList();
        long avgFileSize = fileSizes.stream().mapToLong(i -> i).sum() / (long)fileSizes.size();
        try (CloseableIterable fileScanTasks = table.newScan().planFiles();){
            String fileSetID = UUID.randomUUID().toString();
            FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
            taskSetManager.stageTasks(table, fileSetID, (List)Lists.newArrayList((Iterable)fileScanTasks));
            Dataset scanDF = spark.read().format("iceberg").option("file-scan-task-set-id", fileSetID).option("split-size", Long.toString(avgFileSize * 2L)).option("file-open-cost", "0").load(this.tableName);
            scanDF.writeTo(this.tableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
            FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
            Set rewrittenFiles = taskSetManager.fetchTasks(table, fileSetID).stream().map(ContentScanTask::file).collect(Collectors.toSet());
            Set addedFiles = rewriteCoordinator.fetchNewDataFiles(table, fileSetID);
            table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
        }
        table.refresh();
        Map summary = table.currentSnapshot().summary();
        Assert.assertEquals((String)"Deleted files count must match", (Object)"4", summary.get("deleted-data-files"));
        Assert.assertEquals((String)"Added files count must match", (Object)"2", summary.get("added-data-files"));
        Object rowCount = this.scalarSql("SELECT count(*) FROM %s", this.tableName);
        Assert.assertEquals((String)"Row count must match", (Object)4000L, (Object)rowCount);
    }

    @Test
    public void testSortRewrite() throws NoSuchTableException, IOException {
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        Dataset<Row> df = this.newDF(1000);
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Assert.assertEquals((String)"Should produce 4 snapshots", (long)4L, (long)Iterables.size((Iterable)table.snapshots()));
        try (CloseableIterable fileScanTasks = table.newScan().planFiles();){
            String fileSetID = UUID.randomUUID().toString();
            FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
            taskSetManager.stageTasks(table, fileSetID, (List)Lists.newArrayList((Iterable)fileScanTasks));
            Dataset scanDF = spark.read().format("iceberg").option("file-scan-task-set-id", fileSetID).option("split-size", "134217728").option("file-open-cost", "134217728").load(this.tableName);
            ImmutableMap sqlConf = ImmutableMap.of((Object)"spark.sql.shuffle.partitions", (Object)"2", (Object)"spark.sql.adaptive.enabled", (Object)"false");
            this.withSQLConf((Map<String, String>)sqlConf, () -> {
                try {
                    scanDF.sort("id", new String[0]).writeTo(this.tableName).option("rewritten-file-scan-task-set-id", fileSetID).append();
                }
                catch (NoSuchTableException e) {
                    throw new RuntimeException("Could not replace files", e);
                }
            });
            FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
            Set rewrittenFiles = taskSetManager.fetchTasks(table, fileSetID).stream().map(ContentScanTask::file).collect(Collectors.toSet());
            Set addedFiles = rewriteCoordinator.fetchNewDataFiles(table, fileSetID);
            table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
        }
        table.refresh();
        Map summary = table.currentSnapshot().summary();
        Assert.assertEquals((String)"Deleted files count must match", (Object)"4", summary.get("deleted-data-files"));
        Assert.assertEquals((String)"Added files count must match", (Object)"2", summary.get("added-data-files"));
        Object rowCount = this.scalarSql("SELECT count(*) FROM %s", this.tableName);
        Assert.assertEquals((String)"Row count must match", (Object)4000L, (Object)rowCount);
    }

    @Test
    public void testCommitMultipleRewrites() throws NoSuchTableException, IOException {
        this.sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", this.tableName);
        Dataset<Row> df = this.newDF(1000);
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        String firstFileSetID = UUID.randomUUID().toString();
        long firstFileSetSnapshotId = table.currentSnapshot().snapshotId();
        FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
        try (CloseableIterable tasks = table.newScan().planFiles();){
            taskSetManager.stageTasks(table, firstFileSetID, (List)Lists.newArrayList((Iterable)tasks));
        }
        df.coalesce(1).writeTo(this.tableName).append();
        df.coalesce(1).writeTo(this.tableName).append();
        table.refresh();
        String secondFileSetID = UUID.randomUUID().toString();
        try (CloseableIterable tasks = table.newScan().appendsAfter(firstFileSetSnapshotId).planFiles();){
            taskSetManager.stageTasks(table, secondFileSetID, (List)Lists.newArrayList((Iterable)tasks));
        }
        ImmutableSet fileSetIDs = ImmutableSet.of((Object)firstFileSetID, (Object)secondFileSetID);
        for (String fileSetID2 : fileSetIDs) {
            Dataset scanDF = spark.read().format("iceberg").option("file-scan-task-set-id", fileSetID2).option("split-size", Long.MAX_VALUE).load(this.tableName);
            scanDF.writeTo(this.tableName).option("rewritten-file-scan-task-set-id", fileSetID2).append();
        }
        FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
        Set rewrittenFiles = fileSetIDs.stream().flatMap(fileSetID -> taskSetManager.fetchTasks(table, fileSetID).stream()).map(ContentScanTask::file).collect(Collectors.toSet());
        Set addedFiles = fileSetIDs.stream().flatMap(fileSetID -> rewriteCoordinator.fetchNewDataFiles(table, fileSetID).stream()).collect(Collectors.toSet());
        table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
        table.refresh();
        Assert.assertEquals((String)"Should produce 5 snapshots", (long)5L, (long)Iterables.size((Iterable)table.snapshots()));
        Map summary = table.currentSnapshot().summary();
        Assert.assertEquals((String)"Deleted files count must match", (Object)"4", summary.get("deleted-data-files"));
        Assert.assertEquals((String)"Added files count must match", (Object)"2", summary.get("added-data-files"));
        Object rowCount = this.scalarSql("SELECT count(*) FROM %s", this.tableName);
        Assert.assertEquals((String)"Row count must match", (Object)4000L, (Object)rowCount);
    }

    private Dataset<Row> newDF(int numRecords) {
        ArrayList data = Lists.newArrayListWithExpectedSize((int)numRecords);
        for (int index = 0; index < numRecords; ++index) {
            data.add(new SimpleRecord(index, Integer.toString(index)));
        }
        return spark.createDataFrame((List)data, SimpleRecord.class);
    }
}

