/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.actions;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.actions.RewriteDataFilesActionResult;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.FlinkCatalogTestBase;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestRewriteDataFilesAction
extends FlinkCatalogTestBase {
    private static final String TABLE_NAME_UNPARTITIONED = "test_table_unpartitioned";
    private static final String TABLE_NAME_PARTITIONED = "test_table_partitioned";
    private static final String TABLE_NAME_WITH_PK = "test_table_with_pk";
    private final FileFormat format;
    private Table icebergTableUnPartitioned;
    private Table icebergTablePartitioned;
    private Table icebergTableWithPk;
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    public TestRewriteDataFilesAction(String catalogName, Namespace baseNamespace, FileFormat format) {
        super(catalogName, baseNamespace);
        this.format = format;
    }

    @Override
    protected TableEnvironment getTableEnv() {
        super.getTableEnv().getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, (Object)1);
        return super.getTableEnv();
    }

    @Parameterized.Parameters(name="catalogName={0}, baseNamespace={1}, format={2}")
    public static Iterable<Object[]> parameters() {
        ArrayList parameters = Lists.newArrayList();
        for (FileFormat format : new FileFormat[]{FileFormat.AVRO, FileFormat.ORC, FileFormat.PARQUET}) {
            for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
                String catalogName = (String)catalogParams[0];
                Namespace baseNamespace = (Namespace)catalogParams[1];
                parameters.add(new Object[]{catalogName, baseNamespace, format});
            }
        }
        return parameters;
    }

    @Override
    @Before
    public void before() {
        super.before();
        this.sql("CREATE DATABASE %s", this.flinkDatabase);
        this.sql("USE CATALOG %s", this.catalogName);
        this.sql("USE %s", "db");
        this.sql("CREATE TABLE %s (id int, data varchar) with ('write.format.default'='%s')", TABLE_NAME_UNPARTITIONED, this.format.name());
        this.icebergTableUnPartitioned = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE_NAME_UNPARTITIONED));
        this.sql("CREATE TABLE %s (id int, data varchar,spec varchar)  PARTITIONED BY (data,spec) with ('write.format.default'='%s')", TABLE_NAME_PARTITIONED, this.format.name());
        this.icebergTablePartitioned = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE_NAME_PARTITIONED));
        this.sql("CREATE TABLE %s (id int, data varchar, PRIMARY KEY(`id`) NOT ENFORCED) with ('write.format.default'='%s', 'format-version'='2')", TABLE_NAME_WITH_PK, this.format.name());
        this.icebergTableWithPk = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE_NAME_WITH_PK));
    }

    @Override
    @After
    public void clean() {
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, TABLE_NAME_UNPARTITIONED);
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, TABLE_NAME_PARTITIONED);
        this.sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, TABLE_NAME_WITH_PK);
        this.sql("DROP DATABASE IF EXISTS %s", this.flinkDatabase);
        super.clean();
    }

    @Test
    public void testRewriteDataFilesEmptyTable() throws Exception {
        Assert.assertNull((String)"Table must be empty", (Object)this.icebergTableUnPartitioned.currentSnapshot());
        Actions.forTable((Table)this.icebergTableUnPartitioned).rewriteDataFiles().execute();
        Assert.assertNull((String)"Table must stay empty", (Object)this.icebergTableUnPartitioned.currentSnapshot());
    }

    @Test
    public void testRewriteDataFilesUnpartitionedTable() throws Exception {
        this.sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
        this.sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
        this.icebergTableUnPartitioned.refresh();
        CloseableIterable tasks = this.icebergTableUnPartitioned.newScan().planFiles();
        ArrayList dataFiles = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, ContentScanTask::file));
        Assert.assertEquals((String)"Should have 2 data files before rewrite", (long)2L, (long)dataFiles.size());
        RewriteDataFilesActionResult result = Actions.forTable((Table)this.icebergTableUnPartitioned).rewriteDataFiles().execute();
        Assert.assertEquals((String)"Action should rewrite 2 data files", (long)2L, (long)result.deletedDataFiles().size());
        Assert.assertEquals((String)"Action should add 1 data file", (long)1L, (long)result.addedDataFiles().size());
        this.icebergTableUnPartitioned.refresh();
        CloseableIterable tasks1 = this.icebergTableUnPartitioned.newScan().planFiles();
        ArrayList dataFiles1 = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks1, ContentScanTask::file));
        Assert.assertEquals((String)"Should have 1 data files after rewrite", (long)1L, (long)dataFiles1.size());
        SimpleDataUtil.assertTableRecords(this.icebergTableUnPartitioned, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(1, "hello"), SimpleDataUtil.createRecord(2, "world")}));
    }

    @Test
    public void testRewriteDataFilesPartitionedTable() throws Exception {
        this.sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
        this.sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
        this.sql("INSERT INTO %s SELECT 3, 'world' ,'b'", TABLE_NAME_PARTITIONED);
        this.sql("INSERT INTO %s SELECT 4, 'world' ,'b'", TABLE_NAME_PARTITIONED);
        this.icebergTablePartitioned.refresh();
        CloseableIterable tasks = this.icebergTablePartitioned.newScan().planFiles();
        ArrayList dataFiles = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, ContentScanTask::file));
        Assert.assertEquals((String)"Should have 4 data files before rewrite", (long)4L, (long)dataFiles.size());
        RewriteDataFilesActionResult result = Actions.forTable((Table)this.icebergTablePartitioned).rewriteDataFiles().execute();
        Assert.assertEquals((String)"Action should rewrite 4 data files", (long)4L, (long)result.deletedDataFiles().size());
        Assert.assertEquals((String)"Action should add 2 data file", (long)2L, (long)result.addedDataFiles().size());
        this.icebergTablePartitioned.refresh();
        CloseableIterable tasks1 = this.icebergTablePartitioned.newScan().planFiles();
        ArrayList dataFiles1 = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks1, ContentScanTask::file));
        Assert.assertEquals((String)"Should have 2 data files after rewrite", (long)2L, (long)dataFiles1.size());
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"spec", (Type)Types.StringType.get())});
        GenericRecord record = GenericRecord.create((Schema)schema);
        SimpleDataUtil.assertTableRecords(this.icebergTablePartitioned, (List<Record>)Lists.newArrayList((Object[])new Record[]{record.copy("id", (Object)1, "data", (Object)"hello", "spec", (Object)"a"), record.copy("id", (Object)2, "data", (Object)"hello", "spec", (Object)"a"), record.copy("id", (Object)3, "data", (Object)"world", "spec", (Object)"b"), record.copy("id", (Object)4, "data", (Object)"world", "spec", (Object)"b")}));
    }

    @Test
    public void testRewriteDataFilesWithFilter() throws Exception {
        this.sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
        this.sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
        this.sql("INSERT INTO %s SELECT 3, 'world' ,'a'", TABLE_NAME_PARTITIONED);
        this.sql("INSERT INTO %s SELECT 4, 'world' ,'b'", TABLE_NAME_PARTITIONED);
        this.sql("INSERT INTO %s SELECT 5, 'world' ,'b'", TABLE_NAME_PARTITIONED);
        this.icebergTablePartitioned.refresh();
        CloseableIterable tasks = this.icebergTablePartitioned.newScan().planFiles();
        ArrayList dataFiles = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, ContentScanTask::file));
        Assert.assertEquals((String)"Should have 5 data files before rewrite", (long)5L, (long)dataFiles.size());
        RewriteDataFilesActionResult result = Actions.forTable((Table)this.icebergTablePartitioned).rewriteDataFiles().filter((Expression)Expressions.equal((String)"spec", (Object)"a")).filter((Expression)Expressions.startsWith((String)"data", (String)"he")).execute();
        Assert.assertEquals((String)"Action should rewrite 2 data files", (long)2L, (long)result.deletedDataFiles().size());
        Assert.assertEquals((String)"Action should add 1 data file", (long)1L, (long)result.addedDataFiles().size());
        this.icebergTablePartitioned.refresh();
        CloseableIterable tasks1 = this.icebergTablePartitioned.newScan().planFiles();
        ArrayList dataFiles1 = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks1, ContentScanTask::file));
        Assert.assertEquals((String)"Should have 4 data files after rewrite", (long)4L, (long)dataFiles1.size());
        Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get()), Types.NestedField.optional((int)3, (String)"spec", (Type)Types.StringType.get())});
        GenericRecord record = GenericRecord.create((Schema)schema);
        SimpleDataUtil.assertTableRecords(this.icebergTablePartitioned, (List<Record>)Lists.newArrayList((Object[])new Record[]{record.copy("id", (Object)1, "data", (Object)"hello", "spec", (Object)"a"), record.copy("id", (Object)2, "data", (Object)"hello", "spec", (Object)"a"), record.copy("id", (Object)3, "data", (Object)"world", "spec", (Object)"a"), record.copy("id", (Object)4, "data", (Object)"world", "spec", (Object)"b"), record.copy("id", (Object)5, "data", (Object)"world", "spec", (Object)"b")}));
    }

    @Test
    public void testRewriteLargeTableHasResiduals() throws IOException {
        ArrayList records1 = Lists.newArrayList();
        ArrayList records2 = Lists.newArrayList();
        ArrayList expected = Lists.newArrayList();
        for (int i = 0; i < 100; ++i) {
            int id = i;
            String data = String.valueOf(i % 3);
            if (i % 2 == 0) {
                records1.add("(" + id + ",'" + data + "')");
            } else {
                records2.add("(" + id + ",'" + data + "')");
            }
            Record record = SimpleDataUtil.RECORD.copy();
            record.setField("id", (Object)id);
            record.setField("data", (Object)data);
            expected.add(record);
        }
        this.sql("INSERT INTO %s values " + StringUtils.join((Iterable)records1, (String)","), TABLE_NAME_UNPARTITIONED);
        this.sql("INSERT INTO %s values " + StringUtils.join((Iterable)records2, (String)","), TABLE_NAME_UNPARTITIONED);
        this.icebergTableUnPartitioned.refresh();
        CloseableIterable tasks = ((TableScan)((TableScan)this.icebergTableUnPartitioned.newScan().ignoreResiduals()).filter((Expression)Expressions.equal((String)"data", (Object)"0"))).planFiles();
        for (FileScanTask task : tasks) {
            Assert.assertEquals((String)"Residuals must be ignored", (Object)Expressions.alwaysTrue(), (Object)task.residual());
        }
        ArrayList dataFiles = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, ContentScanTask::file));
        Assert.assertEquals((String)"Should have 2 data files before rewrite", (long)2L, (long)dataFiles.size());
        Actions actions = Actions.forTable((Table)this.icebergTableUnPartitioned);
        RewriteDataFilesActionResult result = actions.rewriteDataFiles().filter((Expression)Expressions.equal((String)"data", (Object)"0")).execute();
        Assert.assertEquals((String)"Action should rewrite 2 data files", (long)2L, (long)result.deletedDataFiles().size());
        Assert.assertEquals((String)"Action should add 1 data file", (long)1L, (long)result.addedDataFiles().size());
        SimpleDataUtil.assertTableRecords(this.icebergTableUnPartitioned, (List<Record>)expected);
    }

    @Test
    public void testRewriteAvoidRepeateCompress() throws IOException {
        ArrayList expected = Lists.newArrayList();
        Schema schema = this.icebergTableUnPartitioned.schema();
        GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
        File file = this.temp.newFile();
        int count = 0;
        try (FileAppender fileAppender = genericAppenderFactory.newAppender(Files.localOutput((File)file), this.format);){
            long filesize = 20000L;
            while (fileAppender.length() < filesize) {
                Record record = SimpleDataUtil.createRecord(count, UUID.randomUUID().toString());
                fileAppender.add((Object)record);
                expected.add(record);
                ++count;
            }
        }
        DataFile dataFile = DataFiles.builder((PartitionSpec)this.icebergTableUnPartitioned.spec()).withPath(file.getAbsolutePath()).withFileSizeInBytes(file.length()).withFormat(this.format).withRecordCount((long)count).build();
        this.icebergTableUnPartitioned.newAppend().appendFile(dataFile).commit();
        this.sql("INSERT INTO %s SELECT 1,'a' ", TABLE_NAME_UNPARTITIONED);
        this.sql("INSERT INTO %s SELECT 2,'b' ", TABLE_NAME_UNPARTITIONED);
        this.icebergTableUnPartitioned.refresh();
        CloseableIterable tasks = this.icebergTableUnPartitioned.newScan().planFiles();
        ArrayList dataFiles = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, ContentScanTask::file));
        Assert.assertEquals((String)"Should have 3 data files before rewrite", (long)3L, (long)dataFiles.size());
        Actions actions = Actions.forTable((Table)this.icebergTableUnPartitioned);
        long targetSizeInBytes = file.length() + 10L;
        RewriteDataFilesActionResult result = actions.rewriteDataFiles().targetSizeInBytes(targetSizeInBytes).splitOpenFileCost(1L).execute();
        Assert.assertEquals((String)"Action should rewrite 2 data files", (long)2L, (long)result.deletedDataFiles().size());
        Assert.assertEquals((String)"Action should add 1 data file", (long)1L, (long)result.addedDataFiles().size());
        this.icebergTableUnPartitioned.refresh();
        CloseableIterable tasks1 = this.icebergTableUnPartitioned.newScan().planFiles();
        ArrayList dataFilesRewrote = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks1, ContentScanTask::file));
        Assert.assertEquals((String)"Should have 2 data files after rewrite", (long)2L, (long)dataFilesRewrote.size());
        List rewroteDataFileNames = dataFilesRewrote.stream().map(ContentFile::path).collect(Collectors.toList());
        Assert.assertTrue((boolean)rewroteDataFileNames.contains(file.getAbsolutePath()));
        expected.add(SimpleDataUtil.createRecord(1, "a"));
        expected.add(SimpleDataUtil.createRecord(2, "b"));
        SimpleDataUtil.assertTableRecords(this.icebergTableUnPartitioned, (List<Record>)expected);
    }

    @Test
    public void testRewriteNoConflictWithEqualityDeletes() throws IOException {
        this.sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_WITH_PK);
        this.sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_WITH_PK);
        Table stale1 = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE_NAME_WITH_PK));
        Table stale2 = this.validationCatalog.loadTable(TableIdentifier.of((Namespace)this.icebergNamespace, (String)TABLE_NAME_WITH_PK));
        this.sql("INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ SELECT 1, 'hi'", TABLE_NAME_WITH_PK);
        this.icebergTableWithPk.refresh();
        Assert.assertEquals((String)"The latest sequence number should be greater than that of the stale snapshot", (long)(stale1.currentSnapshot().sequenceNumber() + 1L), (long)this.icebergTableWithPk.currentSnapshot().sequenceNumber());
        CloseableIterable tasks = this.icebergTableWithPk.newScan().planFiles();
        ArrayList dataFiles = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, ContentScanTask::file));
        Set deleteFiles = Lists.newArrayList((Iterable)CloseableIterable.transform((CloseableIterable)tasks, FileScanTask::deletes)).stream().flatMap(Collection::stream).collect(Collectors.toSet());
        Assert.assertEquals((String)"Should have 3 data files before rewrite", (long)3L, (long)dataFiles.size());
        Assert.assertEquals((String)"Should have 1 delete file before rewrite", (long)1L, (long)deleteFiles.size());
        Assert.assertSame((String)"The 1 delete file should be an equality-delete file", (Object)((DeleteFile)Iterables.getOnlyElement(deleteFiles)).content(), (Object)FileContent.EQUALITY_DELETES);
        this.shouldHaveDataAndFileSequenceNumbers(TABLE_NAME_WITH_PK, (List<Pair<Long, Long>>)ImmutableList.of((Object)Pair.of((Object)1L, (Object)1L), (Object)Pair.of((Object)2L, (Object)2L), (Object)Pair.of((Object)3L, (Object)3L), (Object)Pair.of((Object)3L, (Object)3L)));
        Assertions.assertThatThrownBy(() -> Actions.forTable((Table)stale1).rewriteDataFiles().useStartingSequenceNumber(false).execute(), (String)"Rewrite using new sequence number should fail", (Object[])new Object[0]).isInstanceOf(ValidationException.class);
        RewriteDataFilesActionResult result = Actions.forTable((Table)stale2).rewriteDataFiles().useStartingSequenceNumber(true).execute();
        Assert.assertEquals((String)"Action should rewrite 2 data files", (long)2L, (long)result.deletedDataFiles().size());
        Assert.assertEquals((String)"Action should add 1 data file", (long)1L, (long)result.addedDataFiles().size());
        this.shouldHaveDataAndFileSequenceNumbers(TABLE_NAME_WITH_PK, (List<Pair<Long, Long>>)ImmutableList.of((Object)Pair.of((Object)3L, (Object)3L), (Object)Pair.of((Object)3L, (Object)3L), (Object)Pair.of((Object)2L, (Object)4L)));
        SimpleDataUtil.assertTableRecords(this.icebergTableWithPk, (List<Record>)Lists.newArrayList((Object[])new Record[]{SimpleDataUtil.createRecord(1, "hi"), SimpleDataUtil.createRecord(2, "world")}));
    }

    private void shouldHaveDataAndFileSequenceNumbers(String tableName, List<Pair<Long, Long>> expectedSequenceNumbers) {
        List<Row> liveEntries = this.sql("SELECT * FROM %s$entries WHERE status < 2", tableName);
        List actualSequenceNumbers = liveEntries.stream().map(row -> Pair.of((Object)((Long)row.getFieldAs("sequence_number")), (Object)((Long)row.getFieldAs("file_sequence_number")))).collect(Collectors.toList());
        Assertions.assertThat(actualSequenceNumbers).hasSameElementsAs(expectedSequenceNumbers);
    }
}

