/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class IncrementalClusterActionITCase
extends ActionITCaseBase {
    @Test
    public void testClusterUnpartitionedTable() throws Exception {
        FileStoreTable table = this.createTable(null, 1);
        BinaryString randomStr = BinaryString.fromString((String)IncrementalClusterActionITCase.randomString(150));
        ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>();
        for (int i = 0; i < 3; ++i) {
            for (int j = 0; j < 3; ++j) {
                messages.addAll(this.write(GenericRow.of((Object[])new Object[]{i, j, randomStr, 0})));
            }
        }
        this.commit(messages);
        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[]{0, 1});
        List<String> result1 = this.getResult(readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType());
        ArrayList expected1 = Lists.newArrayList((Object[])new String[]{"+I[0, 0]", "+I[0, 1]", "+I[0, 2]", "+I[1, 0]", "+I[1, 1]", "+I[1, 2]", "+I[2, 0]", "+I[2, 1]", "+I[2, 2]"});
        Assertions.assertThat(result1).containsExactlyElementsOf((Iterable)expected1);
        this.runAction(Collections.emptyList());
        this.checkSnapshot(table);
        List splits = readBuilder.newScan().plan().splits();
        Assertions.assertThat((int)splits.size()).isEqualTo(1);
        Assertions.assertThat((int)((DataSplit)splits.get(0)).dataFiles().size()).isEqualTo(1);
        Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(0)).dataFiles().get(0)).level()).isEqualTo(5);
        List<String> result2 = this.getResult(readBuilder.newRead(), splits, readBuilder.readType());
        ArrayList expected2 = Lists.newArrayList((Object[])new String[]{"+I[0, 0]", "+I[0, 1]", "+I[1, 0]", "+I[1, 1]", "+I[0, 2]", "+I[1, 2]", "+I[2, 0]", "+I[2, 1]", "+I[2, 2]"});
        Assertions.assertThat(result2).containsExactlyElementsOf((Iterable)expected2);
        messages.clear();
        messages.addAll(this.write(GenericRow.of((Object[])new Object[]{0, 3, null, 0}), GenericRow.of((Object[])new Object[]{1, 3, null, 0}), GenericRow.of((Object[])new Object[]{2, 3, null, 0})));
        messages.addAll(this.write(GenericRow.of((Object[])new Object[]{3, 0, null, 0}), GenericRow.of((Object[])new Object[]{3, 1, null, 0}), GenericRow.of((Object[])new Object[]{3, 2, null, 0}), GenericRow.of((Object[])new Object[]{3, 3, null, 0})));
        this.commit(messages);
        List<String> result3 = this.getResult(readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType());
        ArrayList expected3 = new ArrayList(expected2);
        expected3.addAll(Lists.newArrayList((Object[])new String[]{"+I[0, 3]", "+I[1, 3]", "+I[2, 3]", "+I[3, 0]", "+I[3, 1]", "+I[3, 2]", "+I[3, 3]"}));
        Assertions.assertThat(result3).containsExactlyElementsOf(expected3);
        this.runAction(Collections.emptyList());
        this.checkSnapshot(table);
        splits = readBuilder.newScan().plan().splits();
        List<String> result4 = this.getResult(readBuilder.newRead(), splits, readBuilder.readType());
        ArrayList expected4 = new ArrayList(expected2);
        expected4.addAll(Lists.newArrayList((Object[])new String[]{"+I[0, 3]", "+I[1, 3]", "+I[3, 0]", "+I[3, 1]", "+I[2, 3]", "+I[3, 2]", "+I[3, 3]"}));
        Assertions.assertThat((int)splits.size()).isEqualTo(1);
        Assertions.assertThat((int)((DataSplit)splits.get(0)).dataFiles().size()).isEqualTo(2);
        Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(0)).dataFiles().get(0)).level()).isEqualTo(5);
        Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(0)).dataFiles().get(1)).level()).isEqualTo(4);
        Assertions.assertThat(result4).containsExactlyElementsOf(expected4);
        this.runAction(Lists.newArrayList((Object[])new String[]{"--compact_strategy", "full"}));
        this.checkSnapshot(table);
        splits = readBuilder.newScan().plan().splits();
        List<String> result5 = this.getResult(readBuilder.newRead(), splits, readBuilder.readType());
        ArrayList expected5 = new ArrayList(Lists.newArrayList((Object[])new String[]{"+I[0, 0]", "+I[0, 1]", "+I[1, 0]", "+I[1, 1]", "+I[0, 2]", "+I[0, 3]", "+I[1, 2]", "+I[1, 3]", "+I[2, 0]", "+I[2, 1]", "+I[3, 0]", "+I[3, 1]", "+I[2, 2]", "+I[2, 3]", "+I[3, 2]", "+I[3, 3]"}));
        Assertions.assertThat((int)splits.size()).isEqualTo(1);
        Assertions.assertThat((int)((DataSplit)splits.get(0)).dataFiles().size()).isEqualTo(1);
        Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(0)).dataFiles().get(0)).level()).isEqualTo(5);
        Assertions.assertThat(result5).containsExactlyElementsOf(expected5);
    }

    @Test
    public void testClusterPartitionedTable() throws Exception {
        int pt;
        FileStoreTable table = this.createTable("pt", 1);
        BinaryString randomStr = BinaryString.fromString((String)IncrementalClusterActionITCase.randomString(150));
        ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>();
        ArrayList<String> expected1 = new ArrayList<String>();
        for (int pt2 = 0; pt2 < 2; ++pt2) {
            for (int i = 0; i < 3; ++i) {
                for (int j = 0; j < 3; ++j) {
                    messages.addAll(this.write(GenericRow.of((Object[])new Object[]{i, j, pt2 == 0 ? randomStr : null, pt2})));
                    expected1.add(String.format("+I[%s, %s, %s]", i, j, pt2));
                }
            }
        }
        this.commit(messages);
        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[]{0, 1, 3});
        List<String> result1 = this.getResult(readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType());
        Assertions.assertThat(result1).containsExactlyElementsOf(expected1);
        this.runAction(Collections.emptyList());
        this.checkSnapshot(table);
        List splits = readBuilder.newScan().plan().splits();
        Assertions.assertThat((int)splits.size()).isEqualTo(2);
        Assertions.assertThat((int)((DataSplit)splits.get(0)).dataFiles().size()).isEqualTo(1);
        Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(0)).dataFiles().get(0)).level()).isEqualTo(5);
        List<String> result2 = this.getResult(readBuilder.newRead(), splits, readBuilder.readType());
        ArrayList<String> expected2 = new ArrayList<String>();
        for (pt = 0; pt < 2; ++pt) {
            expected2.add(String.format("+I[0, 0, %s]", pt));
            expected2.add(String.format("+I[0, 1, %s]", pt));
            expected2.add(String.format("+I[1, 0, %s]", pt));
            expected2.add(String.format("+I[1, 1, %s]", pt));
            expected2.add(String.format("+I[0, 2, %s]", pt));
            expected2.add(String.format("+I[1, 2, %s]", pt));
            expected2.add(String.format("+I[2, 0, %s]", pt));
            expected2.add(String.format("+I[2, 1, %s]", pt));
            expected2.add(String.format("+I[2, 2, %s]", pt));
        }
        Assertions.assertThat(result2).containsExactlyElementsOf(expected2);
        messages.clear();
        for (pt = 0; pt < 2; ++pt) {
            messages.addAll(this.write(GenericRow.of((Object[])new Object[]{0, 3, null, pt}), GenericRow.of((Object[])new Object[]{1, 3, null, pt}), GenericRow.of((Object[])new Object[]{2, 3, null, pt})));
            messages.addAll(this.write(GenericRow.of((Object[])new Object[]{3, 0, null, pt}), GenericRow.of((Object[])new Object[]{3, 1, null, pt}), GenericRow.of((Object[])new Object[]{3, 2, null, pt}), GenericRow.of((Object[])new Object[]{3, 3, null, pt})));
        }
        this.commit(messages);
        List<String> result3 = this.getResult(readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType());
        ArrayList expected3 = new ArrayList();
        for (int pt3 = 0; pt3 < 2; ++pt3) {
            expected3.addAll(expected2.subList(9 * pt3, 9 * pt3 + 9));
            expected3.add(String.format("+I[0, 3, %s]", pt3));
            expected3.add(String.format("+I[1, 3, %s]", pt3));
            expected3.add(String.format("+I[2, 3, %s]", pt3));
            expected3.add(String.format("+I[3, 0, %s]", pt3));
            expected3.add(String.format("+I[3, 1, %s]", pt3));
            expected3.add(String.format("+I[3, 2, %s]", pt3));
            expected3.add(String.format("+I[3, 3, %s]", pt3));
        }
        Assertions.assertThat(result3).containsExactlyElementsOf(expected3);
        this.runAction(Collections.emptyList());
        this.checkSnapshot(table);
        splits = readBuilder.newScan().plan().splits();
        List<String> result4 = this.getResult(readBuilder.newRead(), splits, readBuilder.readType());
        ArrayList<String> expected4 = new ArrayList<String>();
        expected4.add("+I[0, 0, 0]");
        expected4.add("+I[0, 1, 0]");
        expected4.add("+I[1, 0, 0]");
        expected4.add("+I[1, 1, 0]");
        expected4.add("+I[0, 2, 0]");
        expected4.add("+I[1, 2, 0]");
        expected4.add("+I[2, 0, 0]");
        expected4.add("+I[2, 1, 0]");
        expected4.add("+I[2, 2, 0]");
        expected4.add("+I[0, 3, 0]");
        expected4.add("+I[1, 3, 0]");
        expected4.add("+I[3, 0, 0]");
        expected4.add("+I[3, 1, 0]");
        expected4.add("+I[2, 3, 0]");
        expected4.add("+I[3, 2, 0]");
        expected4.add("+I[3, 3, 0]");
        expected4.add("+I[0, 0, 1]");
        expected4.add("+I[0, 1, 1]");
        expected4.add("+I[1, 0, 1]");
        expected4.add("+I[1, 1, 1]");
        expected4.add("+I[0, 2, 1]");
        expected4.add("+I[0, 3, 1]");
        expected4.add("+I[1, 2, 1]");
        expected4.add("+I[1, 3, 1]");
        expected4.add("+I[2, 0, 1]");
        expected4.add("+I[2, 1, 1]");
        expected4.add("+I[3, 0, 1]");
        expected4.add("+I[3, 1, 1]");
        expected4.add("+I[2, 2, 1]");
        expected4.add("+I[2, 3, 1]");
        expected4.add("+I[3, 2, 1]");
        expected4.add("+I[3, 3, 1]");
        Assertions.assertThat((int)splits.size()).isEqualTo(2);
        Assertions.assertThat((int)((DataSplit)splits.get(0)).dataFiles().size()).isEqualTo(2);
        Assertions.assertThat((int)((DataSplit)splits.get(1)).dataFiles().size()).isEqualTo(1);
        Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(0)).dataFiles().get(0)).level()).isEqualTo(5);
        Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(0)).dataFiles().get(1)).level()).isEqualTo(4);
        Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(1)).dataFiles().get(0)).level()).isEqualTo(5);
        Assertions.assertThat(result4).containsExactlyElementsOf(expected4);
    }

    @Test
    public void testClusterSpecifyPartition() throws Exception {
        FileStoreTable table = this.createTable("pt", 1);
        BinaryString randomStr = BinaryString.fromString((String)IncrementalClusterActionITCase.randomString(150));
        ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>();
        ArrayList<String> expected1 = new ArrayList<String>();
        for (int pt = 0; pt < 2; ++pt) {
            for (int i = 0; i < 3; ++i) {
                for (int j = 0; j < 3; ++j) {
                    messages.addAll(this.write(GenericRow.of((Object[])new Object[]{i, j, pt == 0 ? randomStr : null, pt})));
                    expected1.add(String.format("+I[%s, %s, %s]", i, j, pt));
                }
            }
        }
        this.commit(messages);
        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[]{0, 1, 3});
        List<String> result1 = this.getResult(readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType());
        Assertions.assertThat(result1).containsExactlyElementsOf(expected1);
        this.runAction(Lists.newArrayList((Object[])new String[]{"--partition", "pt=0", "--compact_strategy", "full"}));
        this.checkSnapshot(table);
        List splits = readBuilder.newScan().plan().splits();
        Assertions.assertThat((int)splits.size()).isEqualTo(2);
        for (Split split : splits) {
            DataSplit dataSplit = (DataSplit)split;
            if (dataSplit.partition().getInt(0) == 0) {
                Assertions.assertThat((int)dataSplit.dataFiles().size()).isEqualTo(1);
                Assertions.assertThat((int)((DataFileMeta)dataSplit.dataFiles().get(0)).level()).isEqualTo(5);
                continue;
            }
            Assertions.assertThat((int)dataSplit.dataFiles().size()).isGreaterThan(1);
            Assertions.assertThat((int)((DataFileMeta)dataSplit.dataFiles().get(0)).level()).isEqualTo(0);
        }
    }

    @Test
    public void testClusterHistoryPartition() throws Exception {
        int pt;
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(), "3s");
        FileStoreTable table = this.createTable("pt", 1, options);
        BinaryString randomStr = BinaryString.fromString((String)IncrementalClusterActionITCase.randomString(150));
        ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>();
        ArrayList<String> expected1 = new ArrayList<String>();
        for (int pt2 = 0; pt2 < 4; ++pt2) {
            for (int i = 0; i < 3; ++i) {
                for (int j = 0; j < 3; ++j) {
                    messages.addAll(this.write(GenericRow.of((Object[])new Object[]{i, j, randomStr, pt2})));
                    expected1.add(String.format("+I[%s, %s, %s]", i, j, pt2));
                }
            }
        }
        this.commit(messages);
        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[]{0, 1, 3});
        List<String> result1 = this.getResult(readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType());
        Assertions.assertThat(result1).containsExactlyElementsOf(expected1);
        this.runAction(Collections.emptyList());
        this.checkSnapshot(table);
        List splits = readBuilder.newScan().plan().splits();
        Assertions.assertThat((int)splits.size()).isEqualTo(4);
        Assertions.assertThat((int)((DataSplit)splits.get(0)).dataFiles().size()).isEqualTo(1);
        Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(0)).dataFiles().get(0)).level()).isEqualTo(5);
        List<String> result2 = this.getResult(readBuilder.newRead(), splits, readBuilder.readType());
        ArrayList<String> expected2 = new ArrayList<String>();
        for (pt = 0; pt < 4; ++pt) {
            expected2.add(String.format("+I[0, 0, %s]", pt));
            expected2.add(String.format("+I[0, 1, %s]", pt));
            expected2.add(String.format("+I[1, 0, %s]", pt));
            expected2.add(String.format("+I[1, 1, %s]", pt));
            expected2.add(String.format("+I[0, 2, %s]", pt));
            expected2.add(String.format("+I[1, 2, %s]", pt));
            expected2.add(String.format("+I[2, 0, %s]", pt));
            expected2.add(String.format("+I[2, 1, %s]", pt));
            expected2.add(String.format("+I[2, 2, %s]", pt));
        }
        Assertions.assertThat(result2).containsExactlyElementsOf(expected2);
        messages.clear();
        for (pt = 0; pt < 4; ++pt) {
            messages.addAll(this.write(GenericRow.of((Object[])new Object[]{0, 3, null, pt}), GenericRow.of((Object[])new Object[]{1, 3, null, pt}), GenericRow.of((Object[])new Object[]{2, 3, null, pt})));
            messages.addAll(this.write(GenericRow.of((Object[])new Object[]{3, 0, null, pt}), GenericRow.of((Object[])new Object[]{3, 1, null, pt}), GenericRow.of((Object[])new Object[]{3, 2, null, pt}), GenericRow.of((Object[])new Object[]{3, 3, null, pt})));
            if (pt != 1) continue;
            Thread.sleep(3000L);
        }
        this.commit(messages);
        List<String> result3 = this.getResult(readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType());
        ArrayList expected3 = new ArrayList();
        for (int pt3 = 0; pt3 < 4; ++pt3) {
            expected3.addAll(expected2.subList(9 * pt3, 9 * pt3 + 9));
            expected3.add(String.format("+I[0, 3, %s]", pt3));
            expected3.add(String.format("+I[1, 3, %s]", pt3));
            expected3.add(String.format("+I[2, 3, %s]", pt3));
            expected3.add(String.format("+I[3, 0, %s]", pt3));
            expected3.add(String.format("+I[3, 1, %s]", pt3));
            expected3.add(String.format("+I[3, 2, %s]", pt3));
            expected3.add(String.format("+I[3, 3, %s]", pt3));
        }
        Assertions.assertThat(result3).containsExactlyElementsOf(expected3);
        this.runAction(Lists.newArrayList((Object[])new String[]{"--partition", "pt=3"}));
        this.checkSnapshot(table);
        splits = readBuilder.newScan().plan().splits();
        List<String> result4 = this.getResult(readBuilder.newRead(), splits, readBuilder.readType());
        ArrayList<String> expected4 = new ArrayList<String>();
        Assertions.assertThat((int)splits.size()).isEqualTo(4);
        for (int pt4 = 0; pt4 <= 1; ++pt4) {
            expected4.add(String.format("+I[0, 0, %s]", pt4));
            expected4.add(String.format("+I[0, 1, %s]", pt4));
            expected4.add(String.format("+I[1, 0, %s]", pt4));
            expected4.add(String.format("+I[1, 1, %s]", pt4));
            expected4.add(String.format("+I[0, 2, %s]", pt4));
            expected4.add(String.format("+I[0, 3, %s]", pt4));
            expected4.add(String.format("+I[1, 2, %s]", pt4));
            expected4.add(String.format("+I[1, 3, %s]", pt4));
            expected4.add(String.format("+I[2, 0, %s]", pt4));
            expected4.add(String.format("+I[2, 1, %s]", pt4));
            expected4.add(String.format("+I[3, 0, %s]", pt4));
            expected4.add(String.format("+I[3, 1, %s]", pt4));
            expected4.add(String.format("+I[2, 2, %s]", pt4));
            expected4.add(String.format("+I[2, 3, %s]", pt4));
            expected4.add(String.format("+I[3, 2, %s]", pt4));
            expected4.add(String.format("+I[3, 3, %s]", pt4));
            Assertions.assertThat((int)((DataSplit)splits.get(pt4)).dataFiles().size()).isEqualTo(1);
            Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(pt4)).dataFiles().get(0)).level()).isEqualTo(5);
        }
        expected4.addAll(expected3.subList(32, 48));
        Assertions.assertThat((int)((DataSplit)splits.get(2)).dataFiles().size()).isEqualTo(3);
        expected4.add("+I[0, 0, 3]");
        expected4.add("+I[0, 1, 3]");
        expected4.add("+I[1, 0, 3]");
        expected4.add("+I[1, 1, 3]");
        expected4.add("+I[0, 2, 3]");
        expected4.add("+I[1, 2, 3]");
        expected4.add("+I[2, 0, 3]");
        expected4.add("+I[2, 1, 3]");
        expected4.add("+I[2, 2, 3]");
        expected4.add("+I[0, 3, 3]");
        expected4.add("+I[1, 3, 3]");
        expected4.add("+I[3, 0, 3]");
        expected4.add("+I[3, 1, 3]");
        expected4.add("+I[2, 3, 3]");
        expected4.add("+I[3, 2, 3]");
        expected4.add("+I[3, 3, 3]");
        Assertions.assertThat((int)((DataSplit)splits.get(3)).dataFiles().size()).isEqualTo(2);
        Assertions.assertThat(((DataSplit)splits.get(3)).dataFiles().stream().map(DataFileMeta::level).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new Integer[]{4, 5});
        Assertions.assertThat(result4).containsExactlyElementsOf(expected4);
    }

    @Test
    public void testClusterOnEmptyData() throws Exception {
        this.createTable("pt", 1);
        Assertions.assertThatCode(() -> this.runAction(Collections.emptyList())).doesNotThrowAnyException();
    }

    @Test
    public void testMultiParallelism() throws Exception {
        FileStoreTable table = this.createTable(null, 2);
        BinaryString randomStr = BinaryString.fromString((String)IncrementalClusterActionITCase.randomString(150));
        ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>();
        for (int i = 0; i < 3; ++i) {
            for (int j = 0; j < 3; ++j) {
                messages.addAll(this.write(GenericRow.of((Object[])new Object[]{i, j, randomStr, 0})));
            }
        }
        this.commit(messages);
        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[]{0, 1});
        List<String> result1 = this.getResult(readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType());
        ArrayList expected1 = Lists.newArrayList((Object[])new String[]{"+I[0, 0]", "+I[0, 1]", "+I[0, 2]", "+I[1, 0]", "+I[1, 1]", "+I[1, 2]", "+I[2, 0]", "+I[2, 1]", "+I[2, 2]"});
        Assertions.assertThat(result1).containsExactlyElementsOf((Iterable)expected1);
        this.runAction(Lists.newArrayList((Object[])new String[]{"--table_conf", "scan.parallelism=2"}));
        this.checkSnapshot(table);
        List splits = readBuilder.newScan().plan().splits();
        Assertions.assertThat((int)splits.size()).isEqualTo(1);
        Assertions.assertThat((int)((DataSplit)splits.get(0)).dataFiles().size()).isGreaterThanOrEqualTo(1);
        Assertions.assertThat((int)((DataFileMeta)((DataSplit)splits.get(0)).dataFiles().get(0)).level()).isEqualTo(5);
    }

    protected FileStoreTable createTable(String partitionKeys, int sinkParallelism) throws Exception {
        return this.createTable(partitionKeys, sinkParallelism, Collections.emptyMap());
    }

    protected FileStoreTable createTable(String partitionKeys, int sinkParallelism, Map<String, String> options) throws Exception {
        this.catalog.createDatabase(this.database, true);
        this.catalog.createTable(this.identifier(), IncrementalClusterActionITCase.schema(partitionKeys, sinkParallelism, options), true);
        return (FileStoreTable)this.catalog.getTable(this.identifier());
    }

    private FileStoreTable getTable() throws Exception {
        return (FileStoreTable)this.catalog.getTable(this.identifier());
    }

    private Identifier identifier() {
        return Identifier.create((String)this.database, (String)this.tableName);
    }

    private List<CommitMessage> write(GenericRow ... data) throws Exception {
        BatchWriteBuilder builder = this.getTable().newBatchWriteBuilder();
        try (BatchTableWrite batchTableWrite = builder.newWrite();){
            for (GenericRow row : data) {
                batchTableWrite.write((InternalRow)row);
            }
            List list = batchTableWrite.prepareCommit();
            return list;
        }
    }

    private void commit(List<CommitMessage> messages) throws Exception {
        BatchTableCommit commit = this.getTable().newBatchWriteBuilder().newCommit();
        commit.commit(messages);
        commit.close();
    }

    private static Schema schema(String partitionKeys, int sinkParallelism) {
        return IncrementalClusterActionITCase.schema(partitionKeys, sinkParallelism, Collections.emptyMap());
    }

    private static Schema schema(String partitionKeys, int sinkParallelism, Map<String, String> options) {
        Schema.Builder schemaBuilder = Schema.newBuilder();
        schemaBuilder.column("a", (DataType)DataTypes.INT());
        schemaBuilder.column("b", (DataType)DataTypes.INT());
        schemaBuilder.column("c", (DataType)DataTypes.STRING());
        schemaBuilder.column("pt", (DataType)DataTypes.INT());
        schemaBuilder.option("bucket", "-1");
        schemaBuilder.option("num-levels", "6");
        schemaBuilder.option("num-sorted-run.compaction-trigger", "2");
        schemaBuilder.option("scan.plan-sort-partition", "true");
        schemaBuilder.option("clustering.columns", "a,b");
        schemaBuilder.option("clustering.strategy", "zorder");
        schemaBuilder.option("clustering.incremental", "true");
        schemaBuilder.option("scan.parallelism", "1");
        schemaBuilder.option("sink.parallelism", String.valueOf(sinkParallelism));
        for (String key : options.keySet()) {
            schemaBuilder.option(key, options.get(key));
        }
        if (!StringUtils.isNullOrWhitespaceOnly((String)partitionKeys)) {
            schemaBuilder.partitionKeys(new String[]{partitionKeys});
        }
        return schemaBuilder.build();
    }

    private static String randomString(int length) {
        String chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
        StringBuilder sb = new StringBuilder(length);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        for (int i = 0; i < length; ++i) {
            sb.append(chars.charAt(random.nextInt(chars.length())));
        }
        return sb.toString();
    }

    private void checkSnapshot(FileStoreTable table) {
        Assertions.assertThat((Comparable)((Snapshot)table.latestSnapshot().get()).commitKind()).isEqualTo((Object)Snapshot.CommitKind.COMPACT);
    }

    private void runAction(List<String> extra) throws Exception {
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().batchMode().build();
        ArrayList baseArgs = Lists.newArrayList((Object[])new String[]{"compact", "--database", this.database, "--table", this.tableName});
        ThreadLocalRandom random = ThreadLocalRandom.current();
        if (random.nextBoolean()) {
            baseArgs.addAll(Lists.newArrayList((Object[])new String[]{"--warehouse", this.warehouse}));
        } else {
            baseArgs.addAll(Lists.newArrayList((Object[])new String[]{"--catalog_conf", "warehouse=" + this.warehouse}));
        }
        baseArgs.addAll(extra);
        CompactAction action = this.createAction(CompactAction.class, baseArgs.toArray(new String[0]));
        action.withStreamExecutionEnvironment(env);
        action.run();
    }
}

