/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class CompactProcedureITCase
extends CatalogITCaseBase {
    @Test
    public void testBatchCompact() throws Exception {
        this.sql("CREATE TABLE T ( k INT, v INT, hh INT, dt STRING, PRIMARY KEY (k, dt, hh) NOT ENFORCED) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true', 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        this.sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208'), (1, 100, 15, '20221209')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 100, 15, '20221208'), (2, 100, 16, '20221208'), (2, 100, 15, '20221209')", new Object[0]);
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, (Object)true);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        if (random.nextBoolean()) {
            this.sql("CALL sys.compact(`table` => 'default.T', partitions => 'dt=20221208,hh=15;dt=20221209,hh=15')", new Object[0]);
        } else {
            this.sql("CALL sys.compact(`table` => 'default.T', `where` => '(dt=20221208 and hh=15) or (dt=20221209 and hh=15)')", new Object[0]);
        }
        this.checkLatestSnapshot(table, 3L, Snapshot.CommitKind.COMPACT);
        List splits = table.newSnapshotReader().read().dataSplits();
        Assertions.assertThat((int)splits.size()).isEqualTo(3);
        for (DataSplit split : splits) {
            if (split.partition().getInt(1) == 15) {
                Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(1);
                continue;
            }
            Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(2);
        }
    }

    @Test
    public void testStreamingCompact() throws Exception {
        this.sql("CREATE TABLE T ( k INT, v INT, hh INT, dt STRING, PRIMARY KEY (k, dt, hh) NOT ENFORCED) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true', 'bucket' = '1', 'changelog-producer' = 'full-compaction', 'full-compaction.delta-commits' = '1', 'continuous.discovery-interval' = '1s')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        BlockingIterator<Row, Row> select = this.streamSqlBlockIter("SELECT * FROM T", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208'), (1, 100, 15, '20221209')", new Object[0]);
        this.checkLatestSnapshot(table, 1L, Snapshot.CommitKind.APPEND);
        StreamTableScan scan = table.newReadBuilder().newStreamScan();
        TableScan.Plan plan = scan.plan();
        Assertions.assertThat((List)plan.splits()).isEmpty();
        ThreadLocalRandom random = ThreadLocalRandom.current();
        if (random.nextBoolean()) {
            this.streamSqlIter("CALL sys.compact(`table` => 'default.T', partitions => 'dt=20221208,hh=15;dt=20221209,hh=15', options => 'scan.parallelism=1')", new Object[0]).close();
        } else {
            this.streamSqlIter("CALL sys.compact(`table` => 'default.T', `where` => '(dt=20221208 and hh=15) or (dt=20221209 and hh=15)', options => 'scan.parallelism=1')", new Object[0]).close();
        }
        Assertions.assertThat((List)select.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 100, 15, "20221208"}), Row.of((Object[])new Object[]{1, 100, 15, "20221209"})});
        this.sql("INSERT INTO T VALUES (1, 101, 15, '20221208'), (1, 101, 16, '20221208'), (1, 101, 15, '20221209')", new Object[0]);
        Assertions.assertThat((List)select.collect(4)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, 100, 15, "20221208"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, 101, 15, "20221208"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, 100, 15, "20221209"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, 101, 15, "20221209"})});
        select.close();
    }

    @Test
    public void testHistoryPartitionCompact() throws Exception {
        this.sql("CREATE TABLE T ( k INT, v INT, hh INT, dt STRING, PRIMARY KEY (k, dt, hh) NOT ENFORCED) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true', 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        this.sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208'), (1, 100, 15, '20221209')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 100, 15, '20221208'), (2, 100, 16, '20221208'), (2, 100, 15, '20221209')", new Object[0]);
        Thread.sleep(5000L);
        this.sql("INSERT INTO T VALUES (3, 100, 16, '20221208')", new Object[0]);
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, (Object)true);
        this.sql("CALL sys.compact(`table` => 'default.T', partition_idle_time => '5s')", new Object[0]);
        this.checkLatestSnapshot(table, 4L, Snapshot.CommitKind.COMPACT);
        List splits = table.newSnapshotReader().read().dataSplits();
        Assertions.assertThat((int)splits.size()).isEqualTo(3);
        for (DataSplit split : splits) {
            if (split.partition().getInt(1) == 15) {
                Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(1);
                continue;
            }
            Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(3);
        }
    }

    @Test
    public void testDynamicBucketSortCompact() throws Exception {
        this.sql("CREATE TABLE T ( f0 BIGINT PRIMARY KEY NOT ENFORCED, f1 BIGINT, f2 BIGINT, f3 BIGINT, f4 STRING) WITH ( 'write-only' = 'true', 'dynamic-bucket.target-row-num' = '100', 'zorder.var-length-contribution' = '14')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int commitTimes = 20;
        for (int i = 0; i < commitTimes; ++i) {
            String value = IntStream.range(0, 100).mapToObj(in -> String.format("(%s, %s, %s, %s, '%s')", random.nextLong(), random.nextLong(), random.nextLong(), random.nextLong(), StringUtils.randomNumericString((int)14))).collect(Collectors.joining(","));
            this.sql("INSERT INTO T VALUES %s", value);
        }
        this.checkLatestSnapshot(table, 20L, Snapshot.CommitKind.APPEND);
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, (Object)true);
        this.sql("CALL sys.compact(`table` => 'default.T', order_strategy => 'zorder', order_by => 'f2,f1')", new Object[0]);
        this.checkLatestSnapshot(table, 21L, Snapshot.CommitKind.OVERWRITE);
    }

    @Test
    public void testBatchMinorCompactStrategy() throws Exception {
        this.sql("CREATE TABLE T ( k INT, v INT, hh INT, dt STRING, PRIMARY KEY (k, dt, hh) NOT ENFORCED) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true', 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, (Object)true);
        this.sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 100, 15, '20221208'), (2, 100, 16, '20221208')", new Object[0]);
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        this.sql("CALL sys.compact(`table` => 'default.T', compact_strategy => 'minor', options => 'num-sorted-run.compaction-trigger=3')", new Object[0]);
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        this.sql("INSERT INTO T VALUES (1, 100, 15, '20221208')", new Object[0]);
        this.sql("CALL sys.compact(`table` => 'default.T', compact_strategy => 'minor', options => 'num-sorted-run.compaction-trigger=3')", new Object[0]);
        this.checkLatestSnapshot(table, 4L, Snapshot.CommitKind.COMPACT);
        List splits = table.newSnapshotReader().read().dataSplits();
        Assertions.assertThat((int)splits.size()).isEqualTo(2);
        for (DataSplit split : splits) {
            Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(split.partition().getInt(1) == 16 ? 2 : 1);
        }
    }

    @Test
    public void testBatchFullCompactStrategy() throws Exception {
        this.sql("CREATE TABLE T ( k INT, v INT, hh INT, dt STRING, PRIMARY KEY (k, dt, hh) NOT ENFORCED) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true', 'bucket' = '1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, (Object)true);
        this.sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 100, 15, '20221208'), (2, 100, 16, '20221208')", new Object[0]);
        this.checkLatestSnapshot(table, 2L, Snapshot.CommitKind.APPEND);
        this.sql("CALL sys.compact(`table` => 'default.T', compact_strategy => 'full', options => 'num-sorted-run.compaction-trigger=3')", new Object[0]);
        this.checkLatestSnapshot(table, 3L, Snapshot.CommitKind.COMPACT);
        List splits = table.newSnapshotReader().read().dataSplits();
        Assertions.assertThat((int)splits.size()).isEqualTo(2);
        for (DataSplit split : splits) {
            Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(1);
        }
    }

    @Test
    public void testStreamFullCompactStrategy() throws Exception {
        this.sql("CREATE TABLE T ( k INT, v INT, hh INT, dt STRING, PRIMARY KEY (k, dt, hh) NOT ENFORCED) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true', 'bucket' = '1')", new Object[0]);
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, (Object)true);
        Assertions.assertThatThrownBy(() -> this.streamSqlIter("CALL sys.compact(`table` => 'default.T', compact_strategy => 'full', options => 'num-sorted-run.compaction-trigger=3')", new Object[0]).close()).hasMessageContaining("The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
    }

    private void checkLatestSnapshot(FileStoreTable table, long snapshotId, Snapshot.CommitKind commitKind) {
        SnapshotManager snapshotManager = table.snapshotManager();
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat((long)snapshot.id()).isEqualTo(snapshotId);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)commitKind);
    }
}

