/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AppendPreCommitCompactITCase
extends AbstractTestBase {
    @Test
    public void testCompactNewFiles() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        tEnv.executeSql("CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE T (\n  pt INT,\n  a INT,\n  b STRING\n) PARTITIONED BY (pt) WITH (\n  'write-only' = 'true',\n  'compaction.min.file-num' = '3',\n  'precommit-compact' = 'true',\n  'sink.parallelism' = '2'\n)");
        ArrayList<String> values = new ArrayList<String>();
        for (int pt = 0; pt < 2; ++pt) {
            for (int a = 0; a < 50; ++a) {
                values.add(String.format("(%d, %d, '%d')", pt, a, a * 1000));
            }
        }
        Supplier<Map> getActual = () -> {
            HashMap<String, Integer> result = new HashMap<String, Integer>();
            try (CloseableIterator it = tEnv.executeSql("SELECT * FROM T").collect();){
                while (it.hasNext()) {
                    Row row = (Row)it.next();
                    Assertions.assertThat((int)row.getArity()).isEqualTo(3);
                    result.compute(String.format("(%s, %s, '%s')", row.getField(0), row.getField(1), row.getField(2)), (k, v) -> v == null ? 1 : v + 1);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return result;
        };
        LocalFileIO fileIO = LocalFileIO.create();
        int r = 1;
        while (r <= 3) {
            tEnv.executeSql("INSERT INTO T VALUES " + String.join((CharSequence)", ", values)).await();
            Assertions.assertThat((Object[])fileIO.listStatus(new Path(warehouse, "default.db/T/pt=0/bucket-0"))).hasSize(r);
            Assertions.assertThat((Object[])fileIO.listStatus(new Path(warehouse, "default.db/T/pt=1/bucket-0"))).hasSize(r);
            Map actual = getActual.get();
            Assertions.assertThat(actual.keySet()).hasSameElementsAs(values);
            int e = r++;
            Assertions.assertThat(actual.values()).allMatch(i -> i == e);
        }
        tEnv.executeSql("CALL sys.compact('default.T')").await();
        Assertions.assertThat((Object[])fileIO.listStatus(new Path(warehouse, "default.db/T/pt=0/bucket-0"))).hasSize(4);
        Assertions.assertThat((Object[])fileIO.listStatus(new Path(warehouse, "default.db/T/pt=1/bucket-0"))).hasSize(4);
        Map actual = getActual.get();
        Assertions.assertThat(actual.keySet()).hasSameElementsAs(values);
        Assertions.assertThat(actual.values()).allMatch(i -> i == 3);
        tEnv.executeSql("CALL sys.expire_snapshots(`table` => 'default.T', retain_max => 1, retain_min => 1)").await();
        Assertions.assertThat((Object[])fileIO.listStatus(new Path(warehouse, "default.db/T/pt=0/bucket-0"))).hasSize(1);
        Assertions.assertThat((Object[])fileIO.listStatus(new Path(warehouse, "default.db/T/pt=1/bucket-0"))).hasSize(1);
        actual = getActual.get();
        Assertions.assertThat(actual.keySet()).hasSameElementsAs(values);
        Assertions.assertThat(actual.values()).allMatch(i -> i == 3);
    }
}

