/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.extensions;

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.actions.RewritePositionDeleteFiles;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction;
import org.apache.iceberg.spark.actions.RewritePositionDeleteFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.extensions.SparkExtensionsTestBase;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.Parameterized;

public class TestRewritePositionDeleteFiles
extends SparkExtensionsTestBase {
    private static final Map<String, String> CATALOG_PROPS = ImmutableMap.of((Object)"type", (Object)"hive", (Object)"default-namespace", (Object)"default", (Object)"cache-enabled", (Object)"false");
    private static final String PARTITION_COL = "partition_col";
    private static final int NUM_DATA_FILES = 5;
    private static final int ROWS_PER_DATA_FILE = 100;
    private static final int DELETE_FILES_PER_PARTITION = 2;
    private static final int DELETE_FILE_SIZE = 10;
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    @Parameterized.Parameters(name="formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
    public static Object[][] parameters() {
        return new Object[][]{{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS}};
    }

    public TestRewritePositionDeleteFiles(String catalogName, String implementation, Map<String, String> config) {
        super(catalogName, implementation, config);
    }

    @After
    public void cleanup() {
        this.sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
    }

    @Test
    public void testDatePartition() throws Exception {
        this.createTable("date");
        Date baseDate = Date.valueOf("2023-01-01");
        this.insertData(i -> Date.valueOf(baseDate.toLocalDate().plusDays(i.intValue())));
        this.testDanglingDelete();
    }

    @Test
    public void testBooleanPartition() throws Exception {
        this.createTable("boolean");
        this.insertData(i -> i % 2 == 0, 2);
        this.testDanglingDelete(2);
    }

    @Test
    public void testTimestampPartition() throws Exception {
        this.createTable("timestamp");
        Timestamp baseTimestamp = Timestamp.valueOf("2023-01-01 15:30:00");
        this.insertData(i -> Timestamp.valueOf(baseTimestamp.toLocalDateTime().plusDays(i.intValue())));
        this.testDanglingDelete();
    }

    @Test
    public void testBytePartition() throws Exception {
        this.createTable("byte");
        this.insertData(i -> i);
        this.testDanglingDelete();
    }

    @Test
    public void testDecimalPartition() throws Exception {
        this.createTable("decimal(18, 10)");
        BigDecimal baseDecimal = new BigDecimal("1.0");
        this.insertData(i -> baseDecimal.add(new BigDecimal((int)i)));
        this.testDanglingDelete();
    }

    @Test
    public void testBinaryPartition() throws Exception {
        this.createTable("binary");
        this.insertData(i -> ByteBuffer.allocate(4).putInt((int)i).array());
        this.testDanglingDelete();
    }

    @Test
    public void testCharPartition() throws Exception {
        this.createTable("char(10)");
        this.insertData(Object::toString);
        this.testDanglingDelete();
    }

    @Test
    public void testVarcharPartition() throws Exception {
        this.createTable("varchar(10)");
        this.insertData(Object::toString);
        this.testDanglingDelete();
    }

    @Test
    public void testIntPartition() throws Exception {
        this.createTable("int");
        this.insertData(i -> i);
        this.testDanglingDelete();
    }

    @Test
    public void testDaysPartitionTransform() throws Exception {
        this.createTable("timestamp", PARTITION_COL, String.format("days(%s)", PARTITION_COL));
        Timestamp baseTimestamp = Timestamp.valueOf("2023-01-01 15:30:00");
        this.insertData(i -> Timestamp.valueOf(baseTimestamp.toLocalDateTime().plusDays(i.intValue())));
        this.testDanglingDelete();
    }

    @Test
    public void testNullTransform() throws Exception {
        this.createTable("int");
        this.insertData(i -> i == 0 ? null : Integer.valueOf(1), 2);
        this.testDanglingDelete(2);
    }

    @Test
    public void testPartitionColWithDot() throws Exception {
        String partitionColWithDot = "`partition.col`";
        this.createTable("int", partitionColWithDot, partitionColWithDot);
        this.insertData(partitionColWithDot, i -> i, 5);
        this.testDanglingDelete(partitionColWithDot, 5);
    }

    private void testDanglingDelete() throws Exception {
        this.testDanglingDelete(5);
    }

    private void testDanglingDelete(int numDataFiles) throws Exception {
        this.testDanglingDelete(PARTITION_COL, numDataFiles);
    }

    private void testDanglingDelete(String partitionCol, int numDataFiles) throws Exception {
        Table table = Spark3Util.loadIcebergTable((SparkSession)spark, (String)this.tableName);
        List<DataFile> dataFiles = this.dataFiles(table);
        Assertions.assertThat(dataFiles).hasSize(numDataFiles);
        ((RewriteDataFilesSparkAction)SparkActions.get((SparkSession)spark).rewriteDataFiles(table).option("rewrite-all", "true")).execute();
        this.writePosDeletesForFiles(table, dataFiles);
        List<DeleteFile> deleteFiles = this.deleteFiles(table);
        Assertions.assertThat(deleteFiles).hasSize(numDataFiles * 2);
        List<Object[]> expectedRecords = this.records(this.tableName, partitionCol);
        RewritePositionDeleteFiles.Result result = ((RewritePositionDeleteFilesSparkAction)SparkActions.get((SparkSession)spark).rewritePositionDeletes(table).option("rewrite-all", "true")).execute();
        List<DeleteFile> newDeleteFiles = this.deleteFiles(table);
        ((ListAssert)Assertions.assertThat(newDeleteFiles).as("Remaining dangling deletes", new Object[0])).isEmpty();
        this.checkResult(result, deleteFiles, Lists.newArrayList(), numDataFiles);
        List<Object[]> actualRecords = this.records(this.tableName, partitionCol);
        this.assertEquals("Rows must match", expectedRecords, actualRecords);
    }

    private void createTable(String partitionType) {
        this.createTable(partitionType, PARTITION_COL, PARTITION_COL);
    }

    private void createTable(String partitionType, String partitionCol, String partitionTransform) {
        this.sql("CREATE TABLE %s (id long, %s %s, c1 string, c2 string) USING iceberg PARTITIONED BY (%s) TBLPROPERTIES('format-version'='2')", new Object[]{this.tableName, partitionCol, partitionType, partitionTransform});
    }

    private void insertData(Function<Integer, ?> partitionValueFunction) throws Exception {
        this.insertData(partitionValueFunction, 5);
    }

    private void insertData(Function<Integer, ?> partitionValueFunction, int numDataFiles) throws Exception {
        this.insertData(PARTITION_COL, partitionValueFunction, numDataFiles);
    }

    private void insertData(String partitionCol, Function<Integer, ?> partitionValue, int numDataFiles) throws Exception {
        for (int i = 0; i < numDataFiles; ++i) {
            Dataset df = spark.range(0L, 100L).withColumn(partitionCol, functions.lit(partitionValue.apply(i))).withColumn("c1", functions.expr((String)"CAST(id AS STRING)")).withColumn("c2", functions.expr((String)"CAST(id AS STRING)"));
            this.appendAsFile((Dataset<Row>)df);
        }
    }

    private void appendAsFile(Dataset<Row> df) throws Exception {
        StructType sparkSchema = spark.table(this.tableName).schema();
        spark.createDataFrame(df.rdd(), sparkSchema).coalesce(1).writeTo(this.tableName).append();
    }

    private void writePosDeletesForFiles(Table table, List<DataFile> files) throws IOException {
        Map<StructLike, List<DataFile>> filesByPartition = files.stream().collect(Collectors.groupingBy(ContentFile::partition));
        ArrayList deleteFiles = Lists.newArrayListWithCapacity((int)(2 * filesByPartition.size()));
        for (Map.Entry<StructLike, List<DataFile>> filesByPartitionEntry : filesByPartition.entrySet()) {
            StructLike partition = filesByPartitionEntry.getKey();
            List<DataFile> partitionFiles = filesByPartitionEntry.getValue();
            int deletesForPartition = partitionFiles.size() * 10;
            ((AbstractIntegerAssert)Assertions.assertThat((int)(deletesForPartition % 10)).as("Number of delete files per partition modulo number of data files in this partition", new Object[0])).isEqualTo(0);
            int deleteFileSize = deletesForPartition / 2;
            int counter = 0;
            ArrayList deletes = Lists.newArrayList();
            for (DataFile partitionFile : partitionFiles) {
                for (int deletePos = 0; deletePos < 10; ++deletePos) {
                    deletes.add(Pair.of((Object)partitionFile.path(), (Object)deletePos));
                    if (++counter != deleteFileSize) continue;
                    OutputFile output = Files.localOutput((File)this.temp.newFile());
                    deleteFiles.add(this.writeDeleteFile(table, output, partition, deletes));
                    counter = 0;
                    deletes.clear();
                }
            }
        }
        RowDelta rowDelta = table.newRowDelta();
        deleteFiles.forEach(arg_0 -> ((RowDelta)rowDelta).addDeletes(arg_0));
        rowDelta.commit();
    }

    private DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike partition, List<Pair<CharSequence, Long>> deletes) throws IOException {
        FileFormat format = TestRewritePositionDeleteFiles.defaultFormat(table.properties());
        GenericAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec());
        PositionDeleteWriter writer = factory.newPosDeleteWriter(TestRewritePositionDeleteFiles.encrypt(out), format, partition);
        PositionDelete posDelete = PositionDelete.create();
        try (PositionDeleteWriter toClose = writer;){
            for (Pair<CharSequence, Long> delete : deletes) {
                writer.write(posDelete.set((CharSequence)delete.first(), ((Long)delete.second()).longValue(), null));
            }
        }
        return writer.toDeleteFile();
    }

    private static EncryptedOutputFile encrypt(OutputFile out) {
        return EncryptedFiles.encryptedOutput((OutputFile)out, (EncryptionKeyMetadata)EncryptionKeyMetadata.EMPTY);
    }

    private static FileFormat defaultFormat(Map<String, String> properties) {
        String formatString = properties.getOrDefault("write.format.default", "parquet");
        return FileFormat.fromString((String)formatString);
    }

    private List<Object[]> records(String table, String partitionCol) {
        return this.rowsToJava(spark.read().format("iceberg").load(table).sort(partitionCol, new String[]{"id"}).collectAsList());
    }

    private long size(List<DeleteFile> deleteFiles) {
        return deleteFiles.stream().mapToLong(ContentFile::fileSizeInBytes).sum();
    }

    private List<DataFile> dataFiles(Table table) {
        CloseableIterable tasks = ((TableScan)table.newScan().includeColumnStats()).planFiles();
        return Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, ContentScanTask::file));
    }

    private List<DeleteFile> deleteFiles(Table table) {
        Table deletesTable = MetadataTableUtils.createMetadataTableInstance((Table)table, (MetadataTableType)MetadataTableType.POSITION_DELETES);
        CloseableIterable tasks = deletesTable.newBatchScan().planFiles();
        return Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, t -> (DeleteFile)((PositionDeletesScanTask)t).file()));
    }

    private void checkResult(RewritePositionDeleteFiles.Result result, List<DeleteFile> rewrittenDeletes, List<DeleteFile> newDeletes, int expectedGroups) {
        ((AbstractIntegerAssert)Assertions.assertThat((int)result.rewrittenDeleteFilesCount()).as("Rewritten delete files", new Object[0])).isEqualTo(rewrittenDeletes.size());
        ((AbstractIntegerAssert)Assertions.assertThat((int)result.addedDeleteFilesCount()).as("Added delete files", new Object[0])).isEqualTo(newDeletes.size());
        ((AbstractLongAssert)Assertions.assertThat((long)result.rewrittenBytesCount()).as("Rewritten delete bytes", new Object[0])).isEqualTo(this.size(rewrittenDeletes));
        ((AbstractLongAssert)Assertions.assertThat((long)result.addedBytesCount()).as("New Delete byte count", new Object[0])).isEqualTo(this.size(newDeletes));
        ((ListAssert)Assertions.assertThat((List)result.rewriteResults()).as("Rewritten group count", new Object[0])).hasSize(expectedGroups);
        ((AbstractIntegerAssert)Assertions.assertThat((int)result.rewriteResults().stream().mapToInt(RewritePositionDeleteFiles.FileGroupRewriteResult::rewrittenDeleteFilesCount).sum()).as("Rewritten delete file count in all groups", new Object[0])).isEqualTo(rewrittenDeletes.size());
        ((AbstractIntegerAssert)Assertions.assertThat((int)result.rewriteResults().stream().mapToInt(RewritePositionDeleteFiles.FileGroupRewriteResult::addedDeleteFilesCount).sum()).as("Added delete file count in all groups", new Object[0])).isEqualTo(newDeletes.size());
        ((AbstractLongAssert)Assertions.assertThat((long)result.rewriteResults().stream().mapToLong(RewritePositionDeleteFiles.FileGroupRewriteResult::rewrittenBytesCount).sum()).as("Rewritten delete bytes in all groups", new Object[0])).isEqualTo(this.size(rewrittenDeletes));
        ((AbstractLongAssert)Assertions.assertThat((long)result.rewriteResults().stream().mapToLong(RewritePositionDeleteFiles.FileGroupRewriteResult::addedBytesCount).sum()).as("Added delete bytes in all groups", new Object[0])).isEqualTo(this.size(newDeletes));
    }
}

