/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class ExpirePartitionsProcedureITCase
extends CatalogITCaseBase {
    @Test
    public void testExpirePartitionsProcedure() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, PRIMARY KEY (k, dt) NOT ENFORCED) PARTITIONED BY (dt) WITH ( 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        this.sql("INSERT INTO T VALUES ('1', '2024-06-01')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')", new Object[0]);
        Function<InternalRow, String> consumerReadResult = row -> row.getString(0) + ":" + row.getString(1);
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"1:2024-06-01", "Never-expire:9999-09-09"});
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')")).containsExactlyInAnyOrder((Object[])new String[]{"dt=2024-06-01"});
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"Never-expire:9999-09-09"});
    }

    @Test
    public void testShowExpirePartitionsProcedureResults() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, hm STRING, PRIMARY KEY (k, dt, hm) NOT ENFORCED) PARTITIONED BY (dt, hm) WITH ( 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')")).containsExactlyInAnyOrder((Object[])new String[]{"No expired partitions."});
        this.sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('2', '2024-06-02', '02:00')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09', '99:99')", new Object[0]);
        Function<InternalRow, String> consumerReadResult = row -> row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2);
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"1:2024-06-01:01:00", "2:2024-06-02:02:00", "Never-expire:9999-09-09:99:99"});
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')")).containsExactlyInAnyOrder((Object[])new String[]{"dt=2024-06-01, hm=01:00", "dt=2024-06-02, hm=02:00"});
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"Never-expire:9999-09-09:99:99"});
    }

    @Test
    public void testPartitionExpireValuesTimeStrategy() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, PRIMARY KEY (k, dt) NOT ENFORCED) PARTITIONED BY (dt) WITH ( 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        Function<InternalRow, String> consumerReadResult = row -> row.getString(0) + ":" + row.getString(1);
        this.sql("INSERT INTO T VALUES ('HXH', '2024-06-01')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')", new Object[0]);
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"HXH:2024-06-01", "Never-expire:9999-09-09"});
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 'values-time')")).containsExactlyInAnyOrder((Object[])new String[]{"dt=2024-06-01"});
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"Never-expire:9999-09-09"});
    }

    @Test
    public void testPartitionExpireUpdateTimeStrategy() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, hm STRING, PRIMARY KEY (k, dt, hm) NOT ENFORCED) PARTITIONED BY (dt, hm) WITH ( 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        Function<InternalRow, String> consumerReadResult = row -> row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2);
        this.sql("INSERT INTO T VALUES ('Max-Date', '9999-09-09', '99:99')", new Object[0]);
        Thread.sleep(2500L);
        this.sql("INSERT INTO T VALUES ('HXH', '2024-06-01', '01:00')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('HXH', '2024-06-01', '02:00')", new Object[0]);
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T',expiration_time => '2 s',expire_strategy => 'update-time')")).containsExactlyInAnyOrder((Object[])new String[]{"dt=9999-09-09, hm=99:99"});
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"HXH:2024-06-01:01:00", "HXH:2024-06-01:02:00"});
        Thread.sleep(1500L);
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 s', expire_strategy => 'update-time')")).containsExactlyInAnyOrder((Object[])new String[]{"dt=2024-06-01, hm=01:00", "dt=2024-06-01, hm=02:00"});
        Assertions.assertThat(this.read(table, consumerReadResult)).isEmpty();
    }

    @Test
    public void testPartitionExpireUpdateTimeStrategyInOnePartition() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, hm STRING, PRIMARY KEY (k, dt, hm) NOT ENFORCED) PARTITIONED BY (dt, hm) WITH ( 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        Function<InternalRow, String> consumerReadResult = row -> row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2);
        this.sql("INSERT INTO T VALUES ('HXH', '2024-06-01', '01:00')", new Object[0]);
        Thread.sleep(2500L);
        this.sql("INSERT INTO T VALUES ('HXH', '2024-06-01', '01:00')", new Object[0]);
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '2 s', expire_strategy => 'update-time')")).containsExactlyInAnyOrder((Object[])new String[]{"No expired partitions."});
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"HXH:2024-06-01:01:00"});
        Thread.sleep(1500L);
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 s', expire_strategy => 'update-time')")).containsExactlyInAnyOrder((Object[])new String[]{"dt=2024-06-01, hm=01:00"});
        Assertions.assertThat(this.read(table, consumerReadResult)).isEmpty();
    }

    @Test
    public void testPartitionExpireWithNonDateFormatPartition() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, hm STRING, PRIMARY KEY (k, dt, hm) NOT ENFORCED) PARTITIONED BY (dt, hm) WITH ( 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        Function<InternalRow, String> consumerReadResult = row -> row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2);
        this.sql("INSERT INTO T VALUES ('HXH', 'pt-1', 'hm-1')", new Object[0]);
        Thread.sleep(2500L);
        this.sql("INSERT INTO T VALUES ('HXH', 'pt-2', 'hm-2')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('HXH', 'pt-3', 'hm-3')", new Object[0]);
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '2 s', expire_strategy => 'update-time')")).containsExactlyInAnyOrder((Object[])new String[]{"dt=pt-1, hm=hm-1"});
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"HXH:pt-2:hm-2", "HXH:pt-3:hm-3"});
        Thread.sleep(1500L);
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 s', expire_strategy => 'update-time')")).containsExactlyInAnyOrder((Object[])new String[]{"dt=pt-2, hm=hm-2", "dt=pt-3, hm=hm-3"});
        Assertions.assertThat(this.read(table, consumerReadResult)).isEmpty();
    }

    @Test
    public void testPartitionExpireWithTimePartition() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, hm STRING, PRIMARY KEY (k, dt, hm) NOT ENFORCED) PARTITIONED BY (dt, hm) WITH ( 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_pattern => '$dt $hm', timestamp_formatter => 'yyyy-MM-dd HH:mm')")).containsExactlyInAnyOrder((Object[])new String[]{"No expired partitions."});
        this.sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('2', '2024-06-02', '02:00')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09', '99:99')", new Object[0]);
        Function<InternalRow, String> consumerReadResult = row -> row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2);
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"1:2024-06-01:01:00", "2:2024-06-02:02:00", "Never-expire:9999-09-09:99:99"});
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_pattern => '$dt $hm', timestamp_formatter => 'yyyy-MM-dd HH:mm')")).containsExactlyInAnyOrder((Object[])new String[]{"dt=2024-06-01, hm=01:00", "dt=2024-06-02, hm=02:00"});
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"Never-expire:9999-09-09:99:99"});
    }

    @Test
    public void testSortAndLimitExpirePartition() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, hm STRING, PRIMARY KEY (k, dt, hm) NOT ENFORCED) PARTITIONED BY (dt, hm) WITH ( 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')")).containsExactlyInAnyOrder((Object[])new String[]{"No expired partitions."});
        this.sql("INSERT INTO T VALUES ('3', '2024-06-02', '02:00')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('2', '2024-06-02', '01:00')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('4', '2024-06-03', '01:00')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09', '99:99')", new Object[0]);
        Function<InternalRow, String> consumerReadResult = row -> row.getString(0) + ":" + row.getString(1) + ":" + row.getString(2);
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"1:2024-06-01:01:00", "2:2024-06-02:01:00", "3:2024-06-02:02:00", "4:2024-06-03:01:00", "Never-expire:9999-09-09:99:99"});
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', max_expires => 3)")).containsExactly((Object[])new String[]{"dt=2024-06-01, hm=01:00", "dt=2024-06-02, hm=01:00", "dt=2024-06-02, hm=02:00"});
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"4:2024-06-03:01:00", "Never-expire:9999-09-09:99:99"});
    }

    @Test
    public void testNullPartitionExpire() {
        this.sql("CREATE TABLE T (k INT, ds STRING) PARTITIONED BY (ds);", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, CAST (NULL AS STRING))", new Object[0]);
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyyMMdd')")).containsExactly((Object[])new String[]{"No expired partitions."});
    }

    @Test
    public void testExpirePartitionsWithDefaultNum() throws Exception {
        this.sql("CREATE TABLE T ( k STRING, dt STRING, PRIMARY KEY (k, dt) NOT ENFORCED) PARTITIONED BY (dt) WITH ( 'bucket' = '1', 'partition.expiration-max-num'='2')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        this.sql("INSERT INTO T VALUES ('a', '2024-06-01')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('b', '2024-06-02')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('c', '2024-06-03')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')", new Object[0]);
        Function<InternalRow, String> consumerReadResult = row -> row.getString(0) + ":" + row.getString(1);
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"a:2024-06-01", "b:2024-06-02", "c:2024-06-03", "Never-expire:9999-09-09"});
        Assertions.assertThat(this.callExpirePartitions("CALL sys.expire_partitions(`table` => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')")).containsExactlyInAnyOrder((Object[])new String[]{"dt=2024-06-01", "dt=2024-06-02"});
        Assertions.assertThat(this.read(table, consumerReadResult)).containsExactlyInAnyOrder((Object[])new String[]{"c:2024-06-03", "Never-expire:9999-09-09"});
    }

    public List<String> callExpirePartitions(String callSql) {
        return this.sql(callSql, new Object[0]).stream().map(row -> row.getField(0).toString()).collect(Collectors.toList());
    }

    private List<String> read(FileStoreTable table, Function<InternalRow, String> consumerReadResult) throws IOException {
        ArrayList<String> ret = new ArrayList<String>();
        table.newRead().createReader(table.newScan().plan().splits()).forEachRemaining(row -> ret.add((String)consumerReadResult.apply((InternalRow)row)));
        return ret;
    }
}

