/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class DynamicBucketTableITCase
extends CatalogITCaseBase {
    @Override
    protected List<String> ddl() {
        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (pt INT, pk INT, v INT, PRIMARY KEY (pt, pk) NOT ENFORCED) PARTITIONED BY (pt) WITH ( 'bucket'='-1',  'dynamic-bucket.target-row-num'='3' )");
    }

    @Test
    public void testWriteRead() {
        this.sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, 1}), Row.of((Object[])new Object[]{1, 2, 2}), Row.of((Object[])new Object[]{1, 3, 3}), Row.of((Object[])new Object[]{1, 4, 4}), Row.of((Object[])new Object[]{1, 5, 5})});
        this.sql("INSERT INTO T VALUES (1, 3, 33), (1, 1, 11)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, 11}), Row.of((Object[])new Object[]{1, 2, 2}), Row.of((Object[])new Object[]{1, 3, 33}), Row.of((Object[])new Object[]{1, 4, 4}), Row.of((Object[])new Object[]{1, 5, 5})});
        Assertions.assertThat(this.sql("SELECT DISTINCT bucket FROM T$files", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{0}), Row.of((Object[])new Object[]{1})});
    }

    @Test
    public void testWriteWithAssignerParallelism() {
        this.sql("INSERT INTO T /*+ OPTIONS('dynamic-bucket.assigner-parallelism'='3') */ VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT DISTINCT bucket FROM T$files", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{0}), Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2})});
    }

    @Test
    public void testOverwrite() throws Exception {
        this.sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, 1}), Row.of((Object[])new Object[]{1, 2, 2}), Row.of((Object[])new Object[]{1, 3, 3}), Row.of((Object[])new Object[]{1, 4, 4}), Row.of((Object[])new Object[]{1, 5, 5})});
        this.sql("INSERT OVERWRITE T SELECT * FROM T LIMIT 4", new Object[0]);
        AbstractFileStoreTable table = (AbstractFileStoreTable)CatalogFactory.createCatalog((CatalogContext)CatalogContext.create((Path)new Path(this.path))).getTable(Identifier.create((String)"default", (String)"T"));
        IndexFileHandler indexFileHandler = table.store().newIndexFileHandler();
        List partitions = table.newScan().listPartitions();
        ArrayList entries = new ArrayList();
        partitions.forEach(p -> entries.addAll(indexFileHandler.scan("HASH", p)));
        Long records = entries.stream().map(entry -> entry.indexFile().rowCount()).reduce(Long::sum).get();
        Assertions.assertThat((Long)records).isEqualTo(4L);
    }
}

