/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.CompactActionITCaseBase;
import org.apache.paimon.flink.action.CompactDatabaseAction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

public class CompactDatabaseActionITCase
extends CompactActionITCaseBase {
    private static final String[] DATABASE_NAMES = new String[]{"db1", "db2"};
    private static final String[] TABLE_NAMES = new String[]{"t1", "t2", "t3_unaware_bucket"};
    private static final String[] New_DATABASE_NAMES = new String[]{"db3", "db4"};
    private static final String[] New_TABLE_NAMES = new String[]{"t3", "t4"};
    private static final RowType ROW_TYPE = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}, (String[])new String[]{"k", "v", "hh", "dt"});

    private static Stream<Arguments> testData() {
        return Stream.of(Arguments.of((Object[])new Object[]{"combined", "action"}), Arguments.of((Object[])new Object[]{"divided", "action"}), Arguments.of((Object[])new Object[]{"combined", "procedure_indexed"}), Arguments.of((Object[])new Object[]{"divided", "procedure_indexed"}), Arguments.of((Object[])new Object[]{"combined", "procedure_named"}), Arguments.of((Object[])new Object[]{"divided", "procedure_named"}));
    }

    protected FileStoreTable createTable(String databaseName, String tableName, List<String> partitionKeys, List<String> primaryKeys, Map<String, String> options) throws Exception {
        Identifier identifier = Identifier.create((String)databaseName, (String)tableName);
        this.catalog.createDatabase(databaseName, true);
        this.catalog.createTable(identifier, new Schema(ROW_TYPE.getFields(), partitionKeys, primaryKeys, options, ""), false);
        return (FileStoreTable)this.catalog.getTable(identifier);
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="mode = {0}, invoker = {1}")
    @MethodSource(value={"testData"})
    @Timeout(value=6000L)
    public void testStreamCompactForUnawareTable(String mode, String invoker) throws Exception {
        SnapshotManager snapshotManager;
        void var5_10;
        HashMap<Identifier, FileStoreTable> tableToCompaction = new HashMap<Identifier, FileStoreTable>();
        for (String dbName : DATABASE_NAMES) {
            for (String tableName : TABLE_NAMES) {
                HashMap<String, String> option = new HashMap<String, String>();
                option.put(CoreOptions.WRITE_ONLY.key(), "true");
                if (!tableName.endsWith("unaware_bucket")) continue;
                option.put("bucket", "-1");
                option.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
                ArrayList keys = Lists.newArrayList();
                FileStoreTable table = this.createTable(dbName, tableName, Arrays.asList("dt", "hh"), keys, option);
                tableToCompaction.put(Identifier.create((String)dbName, (String)tableName), table);
            }
        }
        String string = invoker;
        int n = -1;
        switch (string.hashCode()) {
            case -1422950858: {
                if (!string.equals("action")) break;
                boolean bl = false;
                break;
            }
            case 2078404485: {
                if (!string.equals("procedure_indexed")) break;
                boolean bl = true;
                break;
            }
            case 15340237: {
                if (!string.equals("procedure_named")) break;
                int n2 = 2;
            }
        }
        switch (var5_10) {
            case 0: {
                StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
                this.createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse, "--mode", mode).withStreamExecutionEnvironment(env).build();
                env.executeAsync();
                break;
            }
            case 1: {
                this.executeSQL(String.format("CALL sys.compact_database('', '%s')", mode), true, false);
                break;
            }
            case 2: {
                this.executeSQL(String.format("CALL sys.compact_database(mode => '%s')", mode), true, false);
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        for (Map.Entry entry : tableToCompaction.entrySet()) {
            FileStoreTable table = (FileStoreTable)entry.getValue();
            snapshotManager = table.snapshotManager();
            StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = streamWriteBuilder.newWrite();
            this.commit = streamWriteBuilder.newCommit();
            this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
            this.writeData(this.rowData(2, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 15, BinaryString.fromString((String)"20221209")));
            Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat((long)snapshot.id()).isEqualTo(2L);
            Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
            this.write.close();
            this.commit.close();
        }
        for (Map.Entry entry : tableToCompaction.entrySet()) {
            FileStoreTable table = (FileStoreTable)entry.getValue();
            snapshotManager = table.snapshotManager();
            while (snapshotManager.latestSnapshotId() == 2L) {
                Thread.sleep(1000L);
            }
            this.validateResult(table, ROW_TYPE, table.newReadBuilder().newStreamScan(), Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]", "+I[1, 100, 16, 20221208]", "+I[2, 100, 15, 20221208]", "+I[2, 100, 15, 20221209]", "+I[2, 100, 16, 20221208]"), 60000L);
        }
    }

    @ParameterizedTest(name="mode = {0}, invoker = {1}")
    @MethodSource(value={"testData"})
    @Timeout(value=60L)
    public void testBatchCompact(String mode, String invoker) throws Exception {
        ArrayList<FileStoreTable> tables = new ArrayList<FileStoreTable>();
        for (String dbName : DATABASE_NAMES) {
            for (String tableName : TABLE_NAMES) {
                List<Object> keys;
                HashMap<String, String> option = new HashMap<String, String>();
                option.put(CoreOptions.WRITE_ONLY.key(), "true");
                if (tableName.endsWith("unaware_bucket")) {
                    option.put("bucket", "-1");
                    option.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
                    keys = Lists.newArrayList();
                } else {
                    option.put("bucket", "1");
                    keys = Arrays.asList("dt", "hh", "k");
                }
                FileStoreTable table = this.createTable(dbName, tableName, Arrays.asList("dt", "hh"), keys, option);
                tables.add(table);
                SnapshotManager snapshotManager = table.snapshotManager();
                StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = streamWriteBuilder.newWrite();
                this.commit = streamWriteBuilder.newCommit();
                this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
                this.writeData(this.rowData(2, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 15, BinaryString.fromString((String)"20221209")));
                Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat((long)snapshot.id()).isEqualTo(2L);
                Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
                this.write.close();
                this.commit.close();
            }
        }
        switch (invoker) {
            case "action": {
                StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().batchMode().build();
                this.createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse, "--mode", mode).withStreamExecutionEnvironment(env).build();
                env.execute();
                break;
            }
            case "procedure_indexed": {
                this.executeSQL(String.format("CALL sys.compact_database('', '%s')", mode), false, true);
                break;
            }
            case "procedure_named": {
                this.executeSQL(String.format("CALL sys.compact_database(mode => '%s')", mode), false, true);
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        for (FileStoreTable table : tables) {
            SnapshotManager snapshotManager = table.snapshotManager();
            Snapshot snapshot = table.snapshotManager().snapshot(snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat((long)snapshot.id()).isEqualTo(3L);
            Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.COMPACT);
            List splits = table.newSnapshotReader().read().dataSplits();
            Assertions.assertThat((int)splits.size()).isEqualTo(3);
            for (DataSplit split : splits) {
                Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(1);
            }
        }
    }

    @ParameterizedTest(name="mode = {0}, invoker = {1}")
    @MethodSource(value={"testData"})
    public void testStreamingCompact(String mode, String invoker) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
        options.put(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), "1s");
        options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        options.put(CoreOptions.WRITE_ONLY.key(), "true");
        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
        options.put("bucket", "1");
        ArrayList<FileStoreTable> tables = new ArrayList<FileStoreTable>();
        for (String dbName : DATABASE_NAMES) {
            for (String tableName : TABLE_NAMES) {
                FileStoreTable table = this.createTable(dbName, tableName, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options);
                tables.add(table);
                SnapshotManager snapshotManager = table.snapshotManager();
                StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = streamWriteBuilder.newWrite();
                this.commit = streamWriteBuilder.newCommit();
                this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
                Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat((long)snapshot.id()).isEqualTo(1L);
                Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
                StreamTableScan scan = table.newReadBuilder().newStreamScan();
                TableScan.Plan plan = scan.plan();
                Assertions.assertThat((List)plan.splits()).isEmpty();
                this.write.close();
                this.commit.close();
            }
        }
        switch (invoker) {
            case "action": {
                CompactDatabaseAction action = mode.equals("divided") ? this.createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse) : this.createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse, "--mode", "combined", "--table_conf", CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s");
                StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
                action.withStreamExecutionEnvironment(env).build();
                env.executeAsync();
                break;
            }
            case "procedure_indexed": {
                if (mode.equals("divided")) {
                    this.executeSQL("CALL sys.compact_database()", true, false);
                    break;
                }
                this.executeSQL("CALL sys.compact_database('', 'combined', '', '', 'continuous.discovery-interval=1s')", true, false);
                break;
            }
            case "procedure_named": {
                if (mode.equals("divided")) {
                    this.executeSQL("CALL sys.compact_database()", true, false);
                    break;
                }
                this.executeSQL("CALL sys.compact_database(mode => 'combined', table_options => 'continuous.discovery-interval=1s')", true, false);
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        for (FileStoreTable table : tables) {
            StreamTableScan scan = table.newReadBuilder().newStreamScan();
            this.validateResult(table, ROW_TYPE, scan, Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]", "+I[1, 100, 16, 20221208]"), 60000L);
            SnapshotManager snapshotManager = table.snapshotManager();
            StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = streamWriteBuilder.newWrite();
            this.commit = streamWriteBuilder.newCommit();
            this.writeData(this.rowData(1, 101, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 15, BinaryString.fromString((String)"20221209")));
            this.validateResult(table, ROW_TYPE, scan, Arrays.asList("+U[1, 101, 15, 20221208]", "+U[1, 101, 15, 20221209]", "+U[1, 101, 16, 20221208]", "-U[1, 100, 15, 20221208]", "-U[1, 100, 15, 20221209]", "-U[1, 100, 16, 20221208]"), 60000L);
            CommonTestUtils.waitUtil(() -> snapshotManager.latestSnapshotId() - 2L == snapshotManager.earliestSnapshotId(), (Duration)Duration.ofSeconds(60L), (Duration)Duration.ofMillis(100L), (String)String.format("Cannot validate snapshot expiration in %s milliseconds.", 60000));
            this.write.close();
            this.commit.close();
        }
        if (mode.equals("combined")) {
            ArrayList<FileStoreTable> newtables = new ArrayList<FileStoreTable>();
            for (String dbName : New_DATABASE_NAMES) {
                for (String tableName : New_TABLE_NAMES) {
                    FileStoreTable table = this.createTable(dbName, tableName, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options);
                    newtables.add(table);
                    SnapshotManager snapshotManager = table.snapshotManager();
                    StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
                    this.write = streamWriteBuilder.newWrite();
                    this.commit = streamWriteBuilder.newCommit();
                    this.writeData(this.write, this.commit, 0L, this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
                    Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
                    Assertions.assertThat((long)snapshot.id()).isEqualTo(1L);
                    Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
                    this.write.close();
                    this.commit.close();
                }
            }
            for (FileStoreTable table : newtables) {
                StreamTableScan scan = table.newReadBuilder().newStreamScan();
                this.validateResult(table, ROW_TYPE, scan, Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]", "+I[1, 100, 16, 20221208]"), 60000L);
                SnapshotManager snapshotManager = table.snapshotManager();
                StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = streamWriteBuilder.newWrite();
                this.commit = streamWriteBuilder.newCommit();
                this.writeData(this.write, this.commit, 1L, this.rowData(1, 101, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 15, BinaryString.fromString((String)"20221209")));
                this.validateResult(table, ROW_TYPE, scan, Arrays.asList("+U[1, 101, 15, 20221208]", "+U[1, 101, 15, 20221209]", "+U[1, 101, 16, 20221208]", "-U[1, 100, 15, 20221208]", "-U[1, 100, 15, 20221209]", "-U[1, 100, 16, 20221208]"), 60000L);
                CommonTestUtils.waitUtil(() -> snapshotManager.latestSnapshotId() - 2L == snapshotManager.earliestSnapshotId(), (Duration)Duration.ofSeconds(60L), (Duration)Duration.ofMillis(100L), (String)String.format("Cannot validate snapshot expiration in %s milliseconds.", 60000));
                this.write.close();
                this.commit.close();
            }
        }
    }

    @ParameterizedTest(name="mode = {0}, invoker = {1}")
    @MethodSource(value={"testData"})
    @Timeout(value=60L)
    public void testHistoryPartitionCompact(String mode, String invoker) throws Exception {
        ArrayList<FileStoreTable> tables = new ArrayList<FileStoreTable>();
        String partitionIdleTime = "10s";
        for (String dbName : DATABASE_NAMES) {
            for (String tableName : TABLE_NAMES) {
                List<Object> keys;
                HashMap<String, String> option = new HashMap<String, String>();
                option.put(CoreOptions.WRITE_ONLY.key(), "true");
                if (tableName.endsWith("unaware_bucket")) {
                    option.put("bucket", "-1");
                    option.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
                    keys = Lists.newArrayList();
                } else {
                    option.put("bucket", "1");
                    keys = Arrays.asList("dt", "hh", "k");
                }
                FileStoreTable table = this.createTable(dbName, tableName, Arrays.asList("dt", "hh"), keys, option);
                tables.add(table);
                SnapshotManager snapshotManager = table.snapshotManager();
                StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = streamWriteBuilder.newWrite();
                this.commit = streamWriteBuilder.newCommit();
                this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
                this.writeData(this.rowData(2, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 15, BinaryString.fromString((String)"20221209")));
                Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat((long)snapshot.id()).isEqualTo(2L);
                Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
                this.write.close();
                this.commit.close();
            }
        }
        Thread.sleep(10000L);
        for (FileStoreTable table : tables) {
            StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = streamWriteBuilder.newWrite();
            this.commit = streamWriteBuilder.newCommit();
            this.writeData(this.rowData(3, 100, 16, BinaryString.fromString((String)"20221208")));
        }
        switch (invoker) {
            case "action": {
                StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().batchMode().build();
                this.createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse, "--mode", mode, "--partition_idle_time", partitionIdleTime).withStreamExecutionEnvironment(env).build();
                env.execute();
                break;
            }
            case "procedure_indexed": {
                this.executeSQL(String.format("CALL sys.compact_database('', '%s','','','','%s')", mode, partitionIdleTime), false, true);
                break;
            }
            case "procedure_named": {
                this.executeSQL(String.format("CALL sys.compact_database(mode => '%s', partition_idle_time => '%s')", mode, partitionIdleTime), false, true);
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        for (FileStoreTable table : tables) {
            SnapshotManager snapshotManager = table.snapshotManager();
            Snapshot snapshot = table.snapshotManager().snapshot(snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat((long)snapshot.id()).isEqualTo(4L);
            Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.COMPACT);
            List splits = table.newSnapshotReader().read().dataSplits();
            Assertions.assertThat((int)splits.size()).isEqualTo(3);
            for (DataSplit split : splits) {
                if (split.partition().getInt(1) == 16) {
                    Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(3);
                    continue;
                }
                Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(1);
            }
        }
    }

    @ParameterizedTest(name="mode = {0}")
    @MethodSource(value={"testData"})
    @Timeout(value=60L)
    public void includeTableCompaction(String mode, String invoker) throws Exception {
        this.includingAndExcludingTablesImpl(mode, invoker, "db1.t1", null, Collections.singletonList(Identifier.fromString((String)"db1.t1")), Arrays.asList(Identifier.fromString((String)"db1.t2"), Identifier.fromString((String)"db2.t1"), Identifier.fromString((String)"db2.t2")));
    }

    @ParameterizedTest(name="mode = {0}, invoker = {1}")
    @MethodSource(value={"testData"})
    @Timeout(value=60L)
    public void excludeTableCompaction(String mode, String invoker) throws Exception {
        this.includingAndExcludingTablesImpl(mode, invoker, null, "db2.t2", Arrays.asList(Identifier.fromString((String)"db1.t1"), Identifier.fromString((String)"db1.t2"), Identifier.fromString((String)"db2.t1")), Collections.singletonList(Identifier.fromString((String)"db2.t2")));
    }

    @ParameterizedTest(name="mode = {0}, invoker = {1}")
    @MethodSource(value={"testData"})
    @Timeout(value=60L)
    public void includeAndExcludeTableCompaction(String mode, String invoker) throws Exception {
        this.includingAndExcludingTablesImpl(mode, invoker, "db1.+|db2.t1", "db1.t2", Arrays.asList(Identifier.fromString((String)"db1.t1"), Identifier.fromString((String)"db2.t1")), Arrays.asList(Identifier.fromString((String)"db1.t2"), Identifier.fromString((String)"db2.t2")));
    }

    private void includingAndExcludingTablesImpl(String mode, String invoker, @Nullable String includingPattern, @Nullable String excludesPattern, List<Identifier> includeTables, List<Identifier> excludeTables) throws Exception {
        List splits;
        Snapshot snapshot;
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.WRITE_ONLY.key(), "true");
        options.put("bucket", "1");
        ArrayList<FileStoreTable> compactionTables = new ArrayList<FileStoreTable>();
        ArrayList<FileStoreTable> noCompactionTables = new ArrayList<FileStoreTable>();
        for (String dbName : DATABASE_NAMES) {
            for (String tableName : TABLE_NAMES) {
                FileStoreTable table = this.createTable(dbName, tableName, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), options);
                if (includeTables.contains(Identifier.create((String)dbName, (String)tableName))) {
                    compactionTables.add(table);
                } else if (excludeTables.contains(Identifier.create((String)dbName, (String)tableName))) {
                    noCompactionTables.add(table);
                }
                SnapshotManager snapshotManager = table.snapshotManager();
                StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
                this.write = streamWriteBuilder.newWrite();
                this.commit = streamWriteBuilder.newCommit();
                this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
                this.writeData(this.rowData(2, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(2, 100, 15, BinaryString.fromString((String)"20221209")));
                Snapshot snapshot2 = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
                Assertions.assertThat((long)snapshot2.id()).isEqualTo(2L);
                Assertions.assertThat((Comparable)snapshot2.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
                this.write.close();
                this.commit.close();
            }
        }
        switch (invoker) {
            case "action": {
                ArrayList<String> args = new ArrayList<String>();
                args.add("compact_database");
                args.add("--warehouse");
                args.add(this.warehouse);
                if (includingPattern != null) {
                    args.add("--including_tables");
                    args.add(includingPattern);
                }
                if (excludesPattern != null) {
                    args.add("--excluding_tables");
                    args.add(excludesPattern);
                }
                args.add("--mode");
                args.add(mode);
                if (mode.equals("combined")) {
                    args.add("--table_conf");
                    args.add(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s");
                }
                StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().batchMode().build();
                this.createAction(CompactDatabaseAction.class, args).withStreamExecutionEnvironment(env).build();
                env.execute();
                break;
            }
            case "procedure_indexed": {
                if (mode.equals("divided")) {
                    this.executeSQL(String.format("CALL sys.compact_database('', 'divided', '%s', '%s')", this.nonNull(includingPattern), this.nonNull(excludesPattern)), false, true);
                    break;
                }
                this.executeSQL(String.format("CALL sys.compact_database('', 'combined', '%s', '%s', 'continuous.discovery-interval=1s')", this.nonNull(includingPattern), this.nonNull(excludesPattern)), false, true);
                break;
            }
            case "procedure_named": {
                if (mode.equals("divided")) {
                    this.executeSQL(String.format("CALL sys.compact_database(mode => 'divided', including_tables => '%s', excluding_tables => '%s')", this.nonNull(includingPattern), this.nonNull(excludesPattern)), false, true);
                    break;
                }
                this.executeSQL(String.format("CALL sys.compact_database(mode => 'combined', including_tables => '%s', excluding_tables => '%s', table_options => 'continuous.discovery-interval=1s')", this.nonNull(includingPattern), this.nonNull(excludesPattern)), false, true);
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        for (FileStoreTable table : compactionTables) {
            SnapshotManager snapshotManager = table.snapshotManager();
            snapshot = table.snapshotManager().snapshot(snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat((long)snapshot.id()).isEqualTo(3L);
            Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.COMPACT);
            splits = table.newSnapshotReader().read().dataSplits();
            Assertions.assertThat((int)splits.size()).isEqualTo(3);
            for (DataSplit split : splits) {
                Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(1);
            }
        }
        for (FileStoreTable table : noCompactionTables) {
            SnapshotManager snapshotManager = table.snapshotManager();
            snapshot = table.snapshotManager().snapshot(snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat((long)snapshot.id()).isEqualTo(2L);
            Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
            splits = table.newSnapshotReader().read().dataSplits();
            Assertions.assertThat((int)splits.size()).isEqualTo(3);
            for (DataSplit split : splits) {
                Assertions.assertThat((int)split.dataFiles().size()).isEqualTo(2);
            }
        }
    }

    private String nonNull(@Nullable String s) {
        return s == null ? "" : s;
    }

    @Test
    public void testUnawareBucketStreamingCompact() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        options.put(CoreOptions.BUCKET.key(), "-1");
        options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        ArrayList<FileStoreTable> tables = new ArrayList<FileStoreTable>();
        for (String tableName : TABLE_NAMES) {
            FileStoreTable table = this.createTable(this.database, tableName, Collections.singletonList("k"), Collections.emptyList(), options);
            tables.add(table);
            SnapshotManager snapshotManager = table.snapshotManager();
            StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = streamWriteBuilder.newWrite();
            this.commit = streamWriteBuilder.newCommit();
            this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
            this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
            Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat((long)snapshot.id()).isEqualTo(2L);
            Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
            this.write.close();
            this.commit.close();
        }
        if (ThreadLocalRandom.current().nextBoolean()) {
            StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
            this.createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse).withStreamExecutionEnvironment(env).build();
            env.executeAsync();
        } else {
            this.executeSQL("CALL sys.compact_database()");
        }
        for (FileStoreTable table : tables) {
            StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = streamWriteBuilder.newWrite();
            this.commit = streamWriteBuilder.newCommit();
            this.checkFileAndRowSize(table, 3L, 30000L, 1, 6L);
            this.writeData(this.rowData(1, 101, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 101, 15, BinaryString.fromString((String)"20221209")));
            this.checkFileAndRowSize(table, 5L, 30000L, 1, 9L);
            this.write.close();
            this.commit.close();
        }
    }

    @Test
    public void testUnawareBucketBatchCompact() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.BUCKET.key(), "-1");
        options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
        ArrayList<FileStoreTable> tables = new ArrayList<FileStoreTable>();
        for (String tableName : TABLE_NAMES) {
            FileStoreTable table = this.createTable(this.database, tableName, Collections.singletonList("k"), Collections.emptyList(), options);
            tables.add(table);
            SnapshotManager snapshotManager = table.snapshotManager();
            StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
            this.write = streamWriteBuilder.newWrite();
            this.commit = streamWriteBuilder.newCommit();
            this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
            this.writeData(this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")), this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
            Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
            Assertions.assertThat((long)snapshot.id()).isEqualTo(2L);
            Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
            this.write.close();
            this.commit.close();
        }
        if (ThreadLocalRandom.current().nextBoolean()) {
            StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().batchMode().build();
            this.createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse).withStreamExecutionEnvironment(env).build();
            env.execute();
        } else {
            this.executeSQL("CALL sys.compact_database()", false, true);
        }
        for (FileStoreTable table : tables) {
            this.checkFileAndRowSize(table, 3L, 0L, 1, 6L);
        }
    }

    @ParameterizedTest(name="type = {0}")
    @ValueSource(strings={"pk", "unaware"})
    public void testCombinedModeWithDynamicOptions(String type) throws Exception {
        List<String> keys;
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.WRITE_ONLY.key(), "true");
        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1000");
        if (type.equals("pk")) {
            options.put(CoreOptions.BUCKET.key(), "1");
            keys = Arrays.asList("dt", "hh", "k");
        } else {
            options.put(CoreOptions.BUCKET.key(), "-1");
            options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
            keys = Collections.emptyList();
        }
        FileStoreTable table = this.createTable("test_db", "t", Arrays.asList("dt", "hh"), keys, options);
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = streamWriteBuilder.newWrite();
        this.commit = streamWriteBuilder.newCommit();
        for (int i = 0; i < 10; ++i) {
            this.writeData(this.rowData(1, i, 15, BinaryString.fromString((String)"20221208")));
        }
        SnapshotManager snapshotManager = table.snapshotManager();
        Assertions.assertThat((Long)snapshotManager.latestSnapshotId()).isEqualTo(10L);
        CompactDatabaseAction action = this.createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", this.warehouse, "--mode", "combined", "--table_conf", CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s", "--table_conf", CoreOptions.FILE_FORMAT.key() + "=avro", "--table_conf", CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key() + "=3", "--table_conf", CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key() + "=3");
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().batchMode().build();
        action.withStreamExecutionEnvironment(env).build();
        JobClient jobClient = env.executeAsync();
        CommonTestUtils.waitUtil(() -> snapshotManager.latestSnapshotId() == 11L, (Duration)Duration.ofSeconds(240L), (Duration)Duration.ofMillis(500L));
        Assertions.assertThat((Comparable)snapshotManager.latestSnapshot().commitKind()).isEqualTo((Object)Snapshot.CommitKind.COMPACT);
        CommonTestUtils.waitUtil(() -> snapshotManager.earliestSnapshotId() == 9L, (Duration)Duration.ofSeconds(240L), (Duration)Duration.ofMillis(500L), (String)"Failed to wait snapshot expiration success");
        List splits = table.newSnapshotReader().read().dataSplits();
        Assertions.assertThat((int)splits.size()).isEqualTo(1);
        boolean hasAvroFile = ((DataSplit)splits.get(0)).dataFiles().stream().anyMatch(file -> file.fileFormat().equalsIgnoreCase("avro"));
        Assertions.assertThat((boolean)hasAvroFile).isTrue();
        jobClient.cancel();
    }

    private void writeData(StreamTableWrite write, StreamTableCommit commit, long incrementalIdentifier, GenericRow ... data) throws Exception {
        for (GenericRow d : data) {
            write.write((InternalRow)d);
        }
        commit.commit(incrementalIdentifier, write.prepareCommit(true, incrementalIdentifier));
    }
}

