/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import java.io.IOException;
import java.sql.Timestamp;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.ExpireSnapshotsAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class ExpireSnapshotsProcedureITCase
extends CatalogITCaseBase {
    @Test
    public void testExpireSnapshotsProcedure() throws Exception {
        this.sql("CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt INT) WITH ( 'num-sorted-run.compaction-trigger' = '9999' )", new Object[0]);
        FileStoreTable table = this.paimonTable("word_count");
        SnapshotManager snapshotManager = table.snapshotManager();
        for (int i = 0; i < 6; ++i) {
            this.sql("INSERT INTO word_count VALUES ('" + String.valueOf(i) + "', " + i + ")", new Object[0]);
        }
        this.checkSnapshots(snapshotManager, 1, 6);
        this.sql("CALL sys.expire_snapshots(`table` => 'default.word_count', retain_max => 5)", new Object[0]);
        this.checkSnapshots(snapshotManager, 2, 6);
        Timestamp ts6 = new Timestamp(snapshotManager.latestSnapshot().timeMillis());
        this.sql("CALL sys.expire_snapshots(`table` => 'default.word_count', older_than => '" + ts6.toString() + "', max_deletes => 1)", new Object[0]);
        this.checkSnapshots(snapshotManager, 3, 6);
        this.sql("CALL sys.expire_snapshots(`table` => 'default.word_count', older_than => '" + ts6.toString() + "', retain_min => 3)", new Object[0]);
        this.checkSnapshots(snapshotManager, 4, 6);
        this.sql("CALL sys.expire_snapshots(`table`  => 'default.word_count', older_than => '" + ts6.toString() + "')", new Object[0]);
        this.checkSnapshots(snapshotManager, 6, 6);
    }

    @Test
    public void testExpireSnapshotsAction() throws Exception {
        this.sql("CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt INT) WITH ( 'num-sorted-run.compaction-trigger' = '9999' )", new Object[0]);
        FileStoreTable table = this.paimonTable("word_count");
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
        SnapshotManager snapshotManager = table.snapshotManager();
        for (int i = 0; i < 6; ++i) {
            this.sql("INSERT INTO word_count VALUES ('" + String.valueOf(i) + "', " + i + ")", new Object[0]);
        }
        this.checkSnapshots(snapshotManager, 1, 6);
        this.createAction(ExpireSnapshotsAction.class, "expire_snapshots", "--warehouse", this.path, "--database", "default", "--table", "word_count", "--retain_max", "5").withStreamExecutionEnvironment(env).run();
        this.checkSnapshots(snapshotManager, 2, 6);
        Timestamp ts6 = new Timestamp(snapshotManager.latestSnapshot().timeMillis());
        this.createAction(ExpireSnapshotsAction.class, "expire_snapshots", "--warehouse", this.path, "--database", "default", "--table", "word_count", "--older_than", ts6.toString(), "--max_deletes", "1").withStreamExecutionEnvironment(env).run();
        this.checkSnapshots(snapshotManager, 3, 6);
        this.createAction(ExpireSnapshotsAction.class, "expire_snapshots", "--warehouse", this.path, "--database", "default", "--table", "word_count", "--older_than", ts6.toString(), "--retain_min", "3").withStreamExecutionEnvironment(env).run();
        this.checkSnapshots(snapshotManager, 4, 6);
        this.createAction(ExpireSnapshotsAction.class, "expire_snapshots", "--warehouse", this.path, "--database", "default", "--table", "word_count", "--older_than", ts6.toString()).withStreamExecutionEnvironment(env).run();
        this.checkSnapshots(snapshotManager, 6, 6);
    }

    private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws IOException {
        Assertions.assertThat((long)sm.snapshotCount()).isEqualTo((long)(latest - earliest + 1));
        Assertions.assertThat((Long)sm.earliestSnapshotId()).isEqualTo((long)earliest);
        Assertions.assertThat((Long)sm.latestSnapshotId()).isEqualTo((long)latest);
    }

    private <T extends ActionBase> T createAction(Class<T> clazz, String ... args) {
        return (T)ActionFactory.createAction((String[])args).filter(clazz::isInstance).map(clazz::cast).orElseThrow(() -> new RuntimeException("Failed to create action"));
    }
}

