/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.ExpireChangelogsAction;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class ExpireChangelogsProcedureITCase
extends CatalogITCaseBase {
    @Test
    public void testExpireChangelogsProcedure() throws Exception {
        int i;
        this.sql("CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt INT) WITH ( 'num-sorted-run.compaction-trigger' = '9999', 'changelog-producer' = 'input', 'snapshot.num-retained.min' = '4', 'snapshot.num-retained.max' = '4', 'changelog.num-retained.min' = '10', 'changelog.num-retained.max' = '10' )", new Object[0]);
        FileStoreTable table = this.paimonTable("word_count");
        SnapshotManager snapshotManager = table.snapshotManager();
        ChangelogManager changelogManager = table.changelogManager();
        for (int i2 = 0; i2 < 10; ++i2) {
            this.sql("INSERT INTO word_count VALUES ('" + i2 + "', " + i2 + ")", new Object[0]);
        }
        this.checkSnapshots(snapshotManager, 7, 10);
        this.checkChangelogs(changelogManager, 1, 6);
        this.sql("CALL sys.expire_changelogs(`table` => 'default.word_count', retain_max => 8)", new Object[0]);
        this.checkChangelogs(changelogManager, 3, 6);
        Timestamp ts7 = new Timestamp(snapshotManager.latestSnapshot().timeMillis());
        this.sql("CALL sys.expire_changelogs(`table` => 'default.word_count', older_than => '" + ts7 + "', max_deletes => 1)", new Object[0]);
        this.checkChangelogs(changelogManager, 4, 6);
        this.sql("CALL sys.expire_changelogs(`table` => 'default.word_count', older_than => '" + ts7 + "', retain_min => 6)", new Object[0]);
        this.checkChangelogs(changelogManager, 5, 6);
        this.sql("CALL sys.expire_changelogs(`table`  => 'default.word_count', older_than => '" + ts7 + "')", new Object[0]);
        this.checkChangelogs(changelogManager, 6, 6);
        for (i = 10; i < 12; ++i) {
            this.sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")", new Object[0]);
        }
        this.checkSnapshots(snapshotManager, 9, 12);
        this.checkChangelogs(changelogManager, 6, 8);
        this.sql("CALL sys.expire_changelogs(`table` => 'default.word_count', retain_max => 4)", new Object[0]);
        this.checkChangelogs(changelogManager, 8, 8);
        this.checkBatchRead(12);
        this.sql("CALL sys.expire_changelogs(`table` => 'default.word_count', delete_all => true)", new Object[0]);
        this.checkAllDeleted(changelogManager);
        this.checkStreamRead(9, 4);
        this.checkBatchRead(12);
        for (i = 12; i < 15; ++i) {
            this.sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")", new Object[0]);
        }
        this.checkSnapshots(snapshotManager, 12, 15);
        Assertions.assertThat((boolean)changelogManager.fileIO().exists(new Path(changelogManager.changelogDirectory(), "EARLIEST"))).isFalse();
        this.checkChangelogs(changelogManager, 9, 11);
        this.sql("CALL sys.expire_changelogs(`table` => 'default.word_count', delete_all => true)", new Object[0]);
        this.checkAllDeleted(changelogManager);
        for (i = 15; i < 25; ++i) {
            this.sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")", new Object[0]);
        }
        this.checkSnapshots(snapshotManager, 22, 25);
        this.checkChangelogs(changelogManager, 16, 21);
        this.sql("ALTER TABLE word_count SET ( 'changelog.num-retained.min' = '4')", new Object[0]);
        this.sql("ALTER TABLE word_count SET ( 'changelog.num-retained.max' = '4')", new Object[0]);
        for (i = 25; i < 30; ++i) {
            this.sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")", new Object[0]);
        }
        this.checkSnapshots(snapshotManager, 27, 30);
        this.checkChangelogs(changelogManager, 16, 21);
        this.checkBatchRead(30);
        this.sql("ALTER TABLE word_count SET ( 'changelog.num-retained.max' = '50')", new Object[0]);
        this.sql("ALTER TABLE word_count SET ( 'changelog.num-retained.min' = '50')", new Object[0]);
        for (i = 30; i < 35; ++i) {
            this.sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")", new Object[0]);
        }
        this.checkSnapshots(snapshotManager, 32, 35);
        this.checkChangelogs(changelogManager, 16, 31);
        Assertions.assertThat((int)changelogManager.safelyGetAllChangelogs().size()).isEqualTo(11);
        this.sql("ALTER TABLE word_count SET ( 'changelog.num-retained.min' = '4')", new Object[0]);
        this.sql("ALTER TABLE word_count SET ( 'changelog.num-retained.max' = '4')", new Object[0]);
        for (i = 35; i < 40; ++i) {
            this.sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")", new Object[0]);
        }
        this.checkSnapshots(snapshotManager, 37, 40);
        this.checkChangelogs(changelogManager, 16, 31);
        this.sql("CALL sys.expire_changelogs(`table` => 'default.word_count', delete_all => true)", new Object[0]);
        this.checkAllDeleted(changelogManager);
        this.checkStreamRead(37, 4);
        this.checkBatchRead(40);
    }

    @Test
    public void testExpireChangelogsAction() throws Exception {
        this.sql("CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt INT) WITH ( 'num-sorted-run.compaction-trigger' = '9999', 'changelog-producer' = 'input', 'snapshot.num-retained.min' = '4', 'snapshot.num-retained.max' = '4', 'changelog.num-retained.min' = '10', 'changelog.num-retained.max' = '10' )", new Object[0]);
        FileStoreTable table = this.paimonTable("word_count");
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
        SnapshotManager snapshotManager = table.snapshotManager();
        ChangelogManager changelogManager = table.changelogManager();
        for (int i = 0; i < 10; ++i) {
            this.sql("INSERT INTO word_count VALUES ('" + i + "', " + i + ")", new Object[0]);
        }
        this.checkSnapshots(snapshotManager, 7, 10);
        this.checkChangelogs(changelogManager, 1, 6);
        Timestamp ts5 = new Timestamp(changelogManager.changelog(5L).timeMillis());
        this.createAction(ExpireChangelogsAction.class, "expire_changelogs", "--warehouse", this.path, "--database", "default", "--table", "word_count", "--retain_max", "8", "--retain_min", "4", "--older_than", ts5.toString(), "--max_deletes", "3").withStreamExecutionEnvironment(env).run();
        this.checkChangelogs(changelogManager, 4, 6);
        this.createAction(ExpireChangelogsAction.class, "expire_changelogs", "--warehouse", this.path, "--database", "default", "--table", "word_count", "--delete_all", "true").withStreamExecutionEnvironment(env).run();
        this.checkAllDeleted(changelogManager);
    }

    private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws IOException {
        Assertions.assertThat((long)sm.snapshotCount()).isEqualTo((long)(latest - earliest + 1));
        Assertions.assertThat((Long)sm.earliestSnapshotId()).isEqualTo((long)earliest);
        Assertions.assertThat((Long)sm.latestSnapshotId()).isEqualTo((long)latest);
    }

    private void checkChangelogs(ChangelogManager cm, int earliest, int latest) {
        Assertions.assertThat((Long)cm.earliestLongLivedChangelogId()).isEqualTo((long)earliest);
        Assertions.assertThat((Long)cm.latestLongLivedChangelogId()).isEqualTo((long)latest);
    }

    private void checkAllDeleted(ChangelogManager cm) throws IOException {
        Assertions.assertThat((int)cm.fileIO().listStatus(cm.changelogDirectory()).length).isEqualTo(0);
    }

    private void checkBatchRead(int snapshotId) {
        List<Row> rows = this.sql(String.format("select * from word_count /*+ OPTIONS('scan.snapshot-id' = '%s') */", snapshotId), new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(snapshotId);
    }

    private void checkStreamRead(int snapshotId, int expect) throws Exception {
        BlockingIterator<Row, Row> iter = this.streamSqlBlockIter(String.format("select * from word_count /*+ OPTIONS('scan.snapshot-id' = '%s') */", snapshotId), new Object[0]);
        List rows = iter.collect(expect, 60L, TimeUnit.SECONDS);
        ArrayList<Row> expectedRows = new ArrayList<Row>();
        for (int i = snapshotId - 1; i < snapshotId - 1 + expect; ++i) {
            expectedRows.add(Row.of((Object[])new Object[]{String.valueOf(i), i}));
        }
        Assertions.assertThat((List)rows).hasSameElementsAs(expectedRows);
    }

    private <T extends ActionBase> T createAction(Class<T> clazz, String ... args) {
        return (T)ActionFactory.createAction((String[])args).filter(clazz::isInstance).map(clazz::cast).orElseThrow(() -> new RuntimeException("Failed to create action"));
    }
}

