/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.utils.FailingFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class PrimaryKeyFileStoreTableITCase
extends AbstractTestBase {
    private static final int TIMEOUT = 180;
    private String path;
    private Map<String, String> tableDefaultProperties;
    private static final int NUM_PARTS = 4;
    private static final int NUM_KEYS = 64;
    private static final int NUM_VALUES = 1024;
    private static final int LIMIT = 10000;

    @BeforeEach
    public void before() throws IOException {
        this.path = this.getTempDirPath();
        ThreadLocalRandom random = ThreadLocalRandom.current();
        this.tableDefaultProperties = new HashMap<String, String>();
        if (random.nextBoolean()) {
            this.tableDefaultProperties.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "5m");
        }
    }

    private String createCatalogSql(String catalogName, String warehouse) {
        String defaultPropertyString = "";
        if (!this.tableDefaultProperties.isEmpty()) {
            defaultPropertyString = ", ";
            defaultPropertyString = defaultPropertyString + this.tableDefaultProperties.entrySet().stream().map(e -> String.format("'table-default.%s' = '%s'", e.getKey(), e.getValue())).collect(Collectors.joining(", "));
        }
        return String.format("CREATE CATALOG `%s` WITH ( 'type' = 'paimon', 'warehouse' = '%s' %s )", catalogName, warehouse, defaultPropertyString);
    }

    private CloseableIterator<Row> collect(TableResult result) {
        return this.collect(result, 180);
    }

    private CloseableIterator<Row> collect(TableResult result, int timeout) {
        JobClient client = (JobClient)result.getJobClient().get();
        Thread timeoutThread = new Thread(() -> {
            for (int i = 0; i < timeout; ++i) {
                try {
                    Thread.sleep(1000L);
                    if (!((JobStatus)client.getJobStatus().get()).isGloballyTerminalState()) continue;
                    return;
                }
                catch (Exception e) {
                    client.cancel();
                    throw new RuntimeException(e);
                }
            }
            client.cancel();
        });
        timeoutThread.start();
        return result.collect();
    }

    @Test
    @Timeout(value=180L)
    public void testFullCompactionTriggerInterval() throws Exception {
        this.innerTestChangelogProducing(Arrays.asList("'changelog-producer' = 'full-compaction'", "'full-compaction.delta-commits' = '3'"));
    }

    @Test
    @Timeout(value=180L)
    public void testFullCompactionWithLongCheckpointInterval() throws Exception {
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().parallelism(1).build();
        bEnv.executeSql(this.createCatalogSql("testCatalog", this.path));
        bEnv.executeSql("USE CATALOG testCatalog");
        bEnv.executeSql("CREATE TABLE T (  k INT,  v INT,  PRIMARY KEY (k) NOT ENFORCED) WITH (  'bucket' = '1',  'changelog-producer' = 'full-compaction',  'write-only' = 'true')");
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).parallelism(1).build();
        sEnv.executeSql(this.createCatalogSql("testCatalog", this.path));
        sEnv.executeSql("USE CATALOG testCatalog");
        CloseableIterator<Row> it = this.collect(sEnv.executeSql("SELECT * FROM T"));
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().checkpointIntervalMs(2000).build();
        env.setParallelism(1);
        new CompactAction("default", "T", Collections.singletonMap("warehouse", this.path), Collections.emptyMap()).withStreamExecutionEnvironment(env).build();
        JobClient client = env.executeAsync();
        long startMs = System.currentTimeMillis();
        int currentKey = 0;
        while (System.currentTimeMillis() - startMs <= 10000L) {
            bEnv.executeSql(String.format("INSERT INTO T VALUES (%d, %d)", ++currentKey, currentKey * 100)).await();
        }
        Assertions.assertThat((Comparable)((Comparable)client.getJobStatus().get())).isEqualTo((Object)JobStatus.RUNNING);
        for (int i = 1; i <= currentKey; ++i) {
            Assertions.assertThat((boolean)it.hasNext()).isTrue();
            Assertions.assertThat((String)((Row)it.next()).toString()).isEqualTo(String.format("+I[%d, %d]", i, i * 100));
        }
        it.close();
    }

    @Test
    @Timeout(value=180L)
    public void testLookupChangelog() throws Exception {
        this.innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'"));
    }

    @Test
    public void testTableReadWriteBranch() throws Exception {
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100).parallelism(1).build();
        sEnv.executeSql(this.createCatalogSql("testCatalog", this.path + "/warehouse"));
        sEnv.executeSql("USE CATALOG testCatalog");
        sEnv.executeSql("CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'bucket' = '2')");
        CloseableIterator<Row> it = this.collect(sEnv.executeSql("SELECT * FROM T2"));
        sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await();
        ArrayList<String> actual = new ArrayList<String>();
        for (int i = 0; i < 1; ++i) {
            actual.add(((Row)it.next()).toString());
        }
        Assertions.assertThat(actual).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, A]"});
        sEnv.executeSql(String.format("CALL sys.create_tag('%s.%s', 'tag2', 1, '5 d')", "default", "T2"));
        sEnv.executeSql(String.format("CALL sys.create_branch('%s.%s', 'branch1', 'tag2')", "default", "T2"));
        sEnv.executeSql("ALTER TABLE T2 SET ('changelog-producer'='full-compaction')");
        CloseableIterator<Row> branchIt = this.collect(sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */"));
        sEnv.executeSql("INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')").await();
        ArrayList<String> actualBranch = new ArrayList<String>();
        for (int i = 0; i < 4; ++i) {
            actualBranch.add(((Row)branchIt.next()).toString());
        }
        Assertions.assertThat(actualBranch).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, A]", "+I[10, v10]", "+I[11, v11]", "+I[12, v12]"});
        it.close();
        branchIt.close();
    }

    private void innerTestChangelogProducing(List<String> options) throws Exception {
        int i;
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100).parallelism(1).build();
        sEnv.executeSql(this.createCatalogSql("testCatalog", this.path + "/warehouse"));
        sEnv.executeSql("USE CATALOG testCatalog");
        sEnv.executeSql("CREATE TABLE T ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'bucket' = '2', 'continuous.discovery-interval' = '1ms', " + String.join((CharSequence)", ", options) + ")");
        Path inputPath = new Path(this.path, "input");
        LocalFileIO.create().mkdirs(inputPath);
        sEnv.executeSql("CREATE TABLE `default_catalog`.`default_database`.`S` ( i INT, g STRING ) WITH ( 'connector' = 'filesystem', 'format' = 'testcsv', 'path' = '" + inputPath + "', 'source.monitor-interval' = '500ms' )");
        sEnv.executeSql("INSERT INTO T SELECT SUM(i) AS k, g AS v FROM `default_catalog`.`default_database`.`S` GROUP BY g");
        CloseableIterator<Row> it = this.collect(sEnv.executeSql("SELECT * FROM T"));
        sEnv.executeSql("INSERT INTO `default_catalog`.`default_database`.`S` VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D')").await();
        ArrayList<String> actual = new ArrayList<String>();
        for (i = 0; i < 4; ++i) {
            actual.add(((Row)it.next()).toString());
        }
        Assertions.assertThat(actual).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, A]", "+I[2, B]", "+I[3, C]", "+I[4, D]"});
        sEnv.executeSql("INSERT INTO `default_catalog`.`default_database`.`S` VALUES (1, 'D'), (1, 'C'), (1, 'B'), (1, 'A')").await();
        actual.clear();
        for (i = 0; i < 8; ++i) {
            actual.add(((Row)it.next()).toString());
        }
        Assertions.assertThat(actual).containsExactlyInAnyOrder((Object[])new String[]{"-D[1, A]", "-U[2, B]", "+U[2, A]", "-U[3, C]", "+U[3, B]", "-U[4, D]", "+U[4, C]", "+I[5, D]"});
        it.close();
    }

    @Test
    public void testBatchJobWithConflictAndRestart() throws Exception {
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().allowRestart(10).build();
        tEnv.executeSql("CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + this.path + "' )");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE t ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'bucket' = '2', 'full-compaction.delta-commits' = '1' )");
        tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)").await();
        TableResult result1 = tEnv.executeSql("INSERT INTO t /*+ OPTIONS('sink.parallelism' = '2') */ VALUES (1, 11), (2, 21), (3, 31)");
        TableResult result2 = tEnv.executeSql("INSERT INTO t /*+ OPTIONS('sink.parallelism' = '2') */ VALUES (1, 12), (2, 22), (3, 32)");
        result1.await();
        result2.await();
        try (CloseableIterator<Row> it = this.collect(tEnv.executeSql("SELECT * FROM t"));){
            for (int i = 0; i < 3; ++i) {
                Assertions.assertThat(it).hasNext();
                Row row = (Row)it.next();
                Assertions.assertThat((Object)row.getField(1)).isNotEqualTo((Object)((Integer)row.getField(0) * 10));
            }
        }
    }

    @Timeout(value=180L)
    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testRecreateTableWithException(boolean isReloadData) throws Exception {
        int i;
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().build();
        bEnv.executeSql(this.createCatalogSql("testCatalog", this.path + "/warehouse"));
        bEnv.executeSql("USE CATALOG testCatalog");
        bEnv.executeSql("CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH (    'bucket' = '2'\n    ,'continuous.discovery-interval' = '1s'\n)");
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().parallelism(4).checkpointIntervalMs(1000).build();
        sEnv.executeSql(this.createCatalogSql("testCatalog", this.path + "/warehouse"));
        sEnv.executeSql("USE CATALOG testCatalog");
        CloseableIterator<Row> it = this.collect(sEnv.executeSql("SELECT * FROM t"));
        ArrayList<String> values = new ArrayList<String>();
        for (int i2 = 0; i2 < 10; ++i2) {
            values.add(String.format("(0, %d, %d)", i2, i2));
            values.add(String.format("(1, %d, %d)", i2, i2));
        }
        bEnv.executeSql("INSERT INTO t VALUES " + String.join((CharSequence)", ", values)).await();
        ArrayList<Row> expected = new ArrayList<Row>();
        for (i = 0; i < 10; ++i) {
            expected.add(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{0, i, i}));
            expected.add(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, i, i}));
        }
        this.assertStreamingResult(it, expected);
        values.clear();
        for (i = 0; i < 10; ++i) {
            values.add(String.format("(0, %d, %d)", i, i + 1));
            values.add(String.format("(1, %d, %d)", i, i + 1));
        }
        bEnv.executeSql("INSERT INTO t VALUES " + String.join((CharSequence)", ", values)).await();
        for (i = 0; i < 10; ++i) {
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{0, i, i}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, i, i}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{0, i, i + 1}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, i, i + 1}));
        }
        this.assertStreamingResult(it, expected.subList(20, 60));
        bEnv.executeSql("DROP TABLE t");
        bEnv.executeSql("CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH (    'bucket' = '2'\n)");
        if (isReloadData) {
            bEnv.executeSql("INSERT INTO t VALUES " + String.join((CharSequence)", ", values)).await();
        }
        Assertions.assertThatCode(() -> it.next()).rootCause().hasMessageContaining("The next expected snapshot is too big! Most possible cause might be the table had been recreated.");
    }

    @Test
    @Timeout(value=180L)
    public void testChangelogCompactInBatchWrite() throws Exception {
        int i;
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().build();
        String catalogDdl = "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + this.path + "' )";
        bEnv.executeSql(catalogDdl);
        bEnv.executeSql("USE CATALOG mycat");
        bEnv.executeSql("CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH (    'bucket' = '10',\n    'changelog-producer' = 'lookup',\n    'changelog.precommit-compact' = 'true',\n    'snapshot.num-retained.min' = '3',\n    'snapshot.num-retained.max' = '3'\n)");
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(1000).build();
        sEnv.executeSql(catalogDdl);
        sEnv.executeSql("USE CATALOG mycat");
        ArrayList<String> values = new ArrayList<String>();
        for (int i2 = 0; i2 < 1000; ++i2) {
            values.add(String.format("(0, %d, %d)", i2, i2));
            values.add(String.format("(1, %d, %d)", i2, i2));
        }
        bEnv.executeSql("INSERT INTO t VALUES " + String.join((CharSequence)", ", values)).await();
        List<String> compactedChangelogs2 = this.listAllFilesWithPrefix("compacted-changelog-");
        Assertions.assertThat(compactedChangelogs2).hasSize(2);
        Assertions.assertThat(this.listAllFilesWithPrefix("changelog-")).isEmpty();
        List<Object> expected = new ArrayList<Row>();
        for (i = 0; i < 1000; ++i) {
            expected.add(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{0, i, i}));
            expected.add(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, i, i}));
        }
        this.assertStreamingResult(sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"), expected);
        values.clear();
        for (i = 0; i < 1000; ++i) {
            values.add(String.format("(0, %d, %d)", i, i + 1));
            values.add(String.format("(1, %d, %d)", i, i + 1));
        }
        bEnv.executeSql("INSERT INTO t VALUES " + String.join((CharSequence)", ", values)).await();
        Assertions.assertThat(this.listAllFilesWithPrefix("compacted-changelog-")).hasSize(4);
        Assertions.assertThat(this.listAllFilesWithPrefix("changelog-")).isEmpty();
        for (i = 0; i < 1000; ++i) {
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{0, i, i}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, i, i}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{0, i, i + 1}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, i, i + 1}));
        }
        this.assertStreamingResult(sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"), expected);
        values.clear();
        for (i = 0; i < 1000; ++i) {
            values.add(String.format("(0, %d, %d)", i, i + 2));
            values.add(String.format("(1, %d, %d)", i, i + 2));
        }
        bEnv.executeSql("INSERT INTO t VALUES " + String.join((CharSequence)", ", values)).await();
        Assertions.assertThat(this.listAllFilesWithPrefix("compacted-changelog-")).hasSize(4);
        Assertions.assertThat(this.listAllFilesWithPrefix("changelog-")).isEmpty();
        LocalFileIO fileIO = LocalFileIO.create();
        for (String p : compactedChangelogs2) {
            Assertions.assertThat((boolean)fileIO.exists(new Path(p))).isFalse();
        }
        expected = expected.subList(2000, 6000);
        for (int i3 = 0; i3 < 1000; ++i3) {
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{0, i3, i3 + 1}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, i3, i3 + 1}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{0, i3, i3 + 2}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, i3, i3 + 2}));
        }
        this.assertStreamingResult(sEnv.executeSql("SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */"), expected);
    }

    @Test
    @Timeout(value=180L)
    public void testChangelogCompactInStreamWrite() throws Exception {
        int i;
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(2000).parallelism(4).build();
        sEnv.executeSql(this.createCatalogSql("testCatalog", this.path + "/warehouse"));
        sEnv.executeSql("USE CATALOG testCatalog");
        sEnv.executeSql("CREATE TABLE t ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH (    'bucket' = '10',\n    'changelog-producer' = 'lookup',\n    'changelog.precommit-compact' = 'true'\n)");
        Path inputPath = new Path(this.path, "input");
        LocalFileIO.create().mkdirs(inputPath);
        sEnv.executeSql("CREATE TABLE `default_catalog`.`default_database`.`s` ( pt INT, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED) WITH ( 'connector' = 'filesystem', 'format' = 'testcsv', 'path' = '" + inputPath + "', 'source.monitor-interval' = '500ms' )");
        sEnv.executeSql("INSERT INTO t SELECT * FROM `default_catalog`.`default_database`.`s`");
        CloseableIterator<Row> it = this.collect(sEnv.executeSql("SELECT * FROM t"));
        ArrayList<String> values = new ArrayList<String>();
        for (int i2 = 0; i2 < 100; ++i2) {
            values.add(String.format("(0, %d, %d)", i2, i2));
            values.add(String.format("(1, %d, %d)", i2, i2));
        }
        sEnv.executeSql("INSERT INTO `default_catalog`.`default_database`.`s` VALUES " + String.join((CharSequence)", ", values)).await();
        ArrayList<Row> expected = new ArrayList<Row>();
        for (int i3 = 0; i3 < 100; ++i3) {
            expected.add(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{0, i3, i3}));
            expected.add(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, i3, i3}));
        }
        this.assertStreamingResult(it, expected);
        List<String> compactedChangelogs2 = this.listAllFilesWithPrefix("compacted-changelog-");
        Assertions.assertThat(compactedChangelogs2).hasSize(2);
        Assertions.assertThat(this.listAllFilesWithPrefix("changelog-")).isEmpty();
        values.clear();
        for (i = 0; i < 100; ++i) {
            values.add(String.format("(0, %d, %d)", i, i + 1));
            values.add(String.format("(1, %d, %d)", i, i + 1));
        }
        sEnv.executeSql("INSERT INTO `default_catalog`.`default_database`.`s` VALUES " + String.join((CharSequence)", ", values)).await();
        for (i = 0; i < 100; ++i) {
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{0, i, i}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, i, i}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{0, i, i + 1}));
            expected.add(Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, i, i + 1}));
        }
        this.assertStreamingResult(it, expected.subList(200, 600));
        Assertions.assertThat(this.listAllFilesWithPrefix("compacted-changelog-")).hasSize(4);
        Assertions.assertThat(this.listAllFilesWithPrefix("changelog-")).isEmpty();
    }

    private List<String> listAllFilesWithPrefix(String prefix) throws Exception {
        try (Stream<java.nio.file.Path> stream = Files.walk(Paths.get(this.path, new String[0]), new FileVisitOption[0]);){
            List<String> list = stream.filter(x$0 -> Files.isRegularFile(x$0, new LinkOption[0])).filter(p -> p.getFileName().toString().startsWith(prefix)).map(java.nio.file.Path::toString).collect(Collectors.toList());
            return list;
        }
    }

    private void assertStreamingResult(TableResult result, List<Row> expected) throws Exception {
        ArrayList<Object> actual = new ArrayList<Object>();
        try (CloseableIterator<Row> it = this.collect(result);){
            while (actual.size() < expected.size() && it.hasNext()) {
                actual.add(it.next());
            }
        }
        Assertions.assertThat(actual).hasSameElementsAs(expected);
    }

    private void assertStreamingResult(CloseableIterator<Row> it, List<Row> expected) {
        ArrayList<Object> actual = new ArrayList<Object>();
        while (actual.size() < expected.size() && it.hasNext()) {
            actual.add(it.next());
        }
        Assertions.assertThat(actual).hasSameElementsAs(expected);
    }

    @Test
    @Timeout(value=180L)
    public void testNoChangelogProducerBatchRandom() throws Exception {
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().build();
        this.testNoChangelogProducerRandom(bEnv, 1, false);
    }

    @Test
    @Timeout(value=180L)
    public void testNoChangelogProducerStreamingRandom() throws Exception {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(random.nextInt(900) + 100).allowRestart().build();
        this.testNoChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
    }

    @Test
    @Timeout(value=180L)
    public void testFullCompactionChangelogProducerBatchRandom() throws Exception {
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().build();
        this.testFullCompactionChangelogProducerRandom(bEnv, 1, false);
    }

    @Test
    @Timeout(value=180L)
    public void testFullCompactionChangelogProducerStreamingRandom() throws Exception {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(random.nextInt(900) + 100).allowRestart().build();
        this.testFullCompactionChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
    }

    @Test
    @Timeout(value=180L)
    public void testStandAloneFullCompactJobRandom() throws Exception {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(random.nextInt(900) + 100).allowRestart().build();
        this.testStandAloneFullCompactJobRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
    }

    @Test
    @Timeout(value=180L)
    public void testLookupChangelogProducerBatchRandom() throws Exception {
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().build();
        this.testLookupChangelogProducerRandom(bEnv, 1, false);
    }

    @Test
    @Timeout(value=180L)
    public void testLookupChangelogProducerStreamingRandom() throws Exception {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(random.nextInt(900) + 100).allowRestart().build();
        this.testLookupChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
    }

    @Test
    @Timeout(value=180L)
    public void testStandAloneLookupJobRandom() throws Exception {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(random.nextInt(900) + 100).allowRestart().build();
        this.testStandAloneLookupJobRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
    }

    private void testNoChangelogProducerRandom(TableEnvironment tEnv, int numProducers, boolean enableFailure) throws Exception {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        boolean enableDeletionVectors = random.nextBoolean();
        if (enableDeletionVectors) {
            numProducers = 1;
        }
        List<TableResult> results = this.testRandom(tEnv, numProducers, enableFailure, "'bucket' = '4'," + String.format("'deletion-vectors.enabled' = '%s'", enableDeletionVectors));
        for (TableResult result : results) {
            result.await();
        }
        this.checkBatchResult(numProducers);
    }

    private void testFullCompactionChangelogProducerRandom(TableEnvironment tEnv, int numProducers, boolean enableFailure) throws Exception {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        this.testRandom(tEnv, numProducers, enableFailure, "'bucket' = '4'," + String.format("'write-buffer-size' = '%s','changelog-producer' = 'full-compaction','full-compaction.delta-commits' = '3'", random.nextBoolean() ? "4mb" : "8mb"));
        Thread.sleep(random.nextInt(5000));
        this.checkChangelogTestResult(numProducers);
    }

    private void testLookupChangelogProducerRandom(TableEnvironment tEnv, int numProducers, boolean enableFailure) throws Exception {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        boolean enableDeletionVectors = random.nextBoolean();
        if (enableDeletionVectors) {
            numProducers = 1;
        }
        this.testRandom(tEnv, numProducers, enableFailure, String.format("'bucket' = '4', 'writer-buffer-size' = '%s', 'changelog-producer' = 'lookup', 'lookup-wait' = '%s', 'deletion-vectors.enabled' = '%s', 'changelog.precommit-compact' = '%s'", random.nextBoolean() ? "4mb" : "8mb", random.nextBoolean(), enableDeletionVectors, random.nextBoolean()));
        Thread.sleep(random.nextInt(5000));
        this.checkChangelogTestResult(numProducers);
    }

    private void testStandAloneFullCompactJobRandom(TableEnvironment tEnv, int numProducers, boolean enableConflicts) throws Exception {
        int i;
        ThreadLocalRandom random = ThreadLocalRandom.current();
        this.testRandom(tEnv, numProducers, false, "'bucket' = '4'," + String.format("'write-buffer-size' = '%s','changelog-producer' = 'full-compaction','full-compaction.delta-commits' = '3','write-only' = 'true'", random.nextBoolean() ? "4mb" : "8mb"));
        Thread.sleep(random.nextInt(2500));
        int n = i = enableConflicts ? 2 : 1;
        while (i > 0) {
            StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().checkpointIntervalMs(random.nextInt(1900) + 100).parallelism(2).allowRestart().build();
            new CompactAction("default", "T", Collections.singletonMap("warehouse", this.path), Collections.emptyMap()).withStreamExecutionEnvironment(env).build();
            env.executeAsync();
            --i;
        }
        Thread.sleep(random.nextInt(2500));
        this.checkChangelogTestResult(numProducers);
    }

    private void testStandAloneLookupJobRandom(TableEnvironment tEnv, int numProducers, boolean enableConflicts) throws Exception {
        int i;
        ThreadLocalRandom random = ThreadLocalRandom.current();
        this.testRandom(tEnv, numProducers, false, "'bucket' = '4'," + String.format("'write-buffer-size' = '%s','changelog-producer' = 'lookup','lookup-wait' = '%s','write-only' = 'true'", random.nextBoolean() ? "4mb" : "8mb", random.nextBoolean()));
        Thread.sleep(random.nextInt(2500));
        int n = i = enableConflicts ? 2 : 1;
        while (i > 0) {
            StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().checkpointIntervalMs(random.nextInt(1900) + 100).allowRestart().build();
            env.setParallelism(2);
            new CompactAction("default", "T", Collections.singletonMap("warehouse", this.path), Collections.emptyMap()).withStreamExecutionEnvironment(env).build();
            env.executeAsync();
            --i;
        }
        Thread.sleep(random.nextInt(2500));
        this.checkChangelogTestResult(numProducers);
    }

    private void checkChangelogTestResult(int numProducers) throws Exception {
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(100).parallelism(1).build();
        sEnv.executeSql(this.createCatalogSql("testCatalog", this.path));
        sEnv.executeSql("USE CATALOG testCatalog");
        ResultChecker checker = new ResultChecker();
        int endCnt = 0;
        try (CloseableIterator<Row> it = this.collect(sEnv.executeSql("SELECT * FROM T"));){
            while (it.hasNext()) {
                Row row = (Row)it.next();
                checker.addChangelog(row);
                if ((Long)row.getField(2) < 10000L || ++endCnt != numProducers * 4 * 64) continue;
                break;
            }
        }
        checker.assertResult(numProducers);
        this.checkBatchResult(numProducers);
    }

    private List<TableResult> testRandom(TableEnvironment tEnv, int numProducers, boolean enableFailure, String tableProperties) throws Exception {
        tableProperties = tableProperties + ",'continuous.discovery-interval' = '1ms'";
        String failingName = UUID.randomUUID().toString();
        String failingPath = FailingFileIO.getFailingPath((String)failingName, (String)this.path);
        FailingFileIO.reset((String)failingName, (int)0, (int)1);
        tEnv.executeSql(this.createCatalogSql("testCatalog", failingPath));
        tEnv.executeSql("USE CATALOG testCatalog");
        tEnv.executeSql("CREATE TABLE T(  pt STRING,  k INT,  v1 BIGINT,  v2 STRING,  PRIMARY KEY (pt, k) NOT ENFORCED) PARTITIONED BY (pt) WITH (" + tableProperties + ")");
        tEnv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)1);
        tEnv.executeSql("CREATE TABLE `default_catalog`.`default_database`.`S` (  i INT) WITH (  'connector' = 'datagen',  'fields.i.kind' = 'sequence',  'fields.i.start' = '0',  'fields.i.end' = '10255',  'number-of-rows' = '10256',  'rows-per-second' = '" + (500 + ThreadLocalRandom.current().nextInt(500)) + "')").await();
        ArrayList<TableResult> results = new ArrayList<TableResult>();
        if (enableFailure) {
            FailingFileIO.reset((String)failingName, (int)2, (int)10000);
        }
        int i = 0;
        while (i < numProducers) {
            String ptSql = String.format("IF(i >= %d, CAST((i - %d) / %d AS STRING), CAST(CAST(FLOOR(RAND() * %d) AS INT) AS STRING)) AS pt", 10000, 10000, 64, 4);
            String kSql = String.format("IF(i >= %d, MOD(i - %d, %d), CAST(FLOOR(RAND() * %d) AS INT)) + %d AS k", 10000, 10000, 64, 64, i * 64);
            String v1Sql = String.format("IF(i >= %d, i, CAST(FLOOR(RAND() * %d) AS BIGINT)) AS v1", 10000, 1024);
            String v2Sql = "CAST(i AS STRING) || '.str' AS v2";
            tEnv.executeSql(String.format("CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s, %s, %s FROM `default_catalog`.`default_database`.`S`", i, ptSql, kSql, v1Sql, v2Sql));
            int idx = i++;
            TableResult result = (TableResult)FailingFileIO.retryArtificialException(() -> tEnv.executeSql("INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM myView" + idx));
            results.add(result);
        }
        return results;
    }

    private void checkBatchResult(int numProducers) throws Exception {
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().build();
        bEnv.executeSql(this.createCatalogSql("testCatalog", this.path));
        bEnv.executeSql("USE CATALOG testCatalog");
        ResultChecker checker = new ResultChecker();
        try (CloseableIterator<Row> it = this.collect(bEnv.executeSql("SELECT * FROM T"));){
            while (it.hasNext()) {
                checker.addChangelog((Row)it.next());
            }
        }
        checker.assertResult(numProducers);
    }

    private static class ResultChecker {
        private final Map<String, String> valueMap = new HashMap<String, String>();
        private final Map<String, RowKind> kindMap = new HashMap<String, RowKind>();

        private ResultChecker() {
        }

        private void addChangelog(Row row) {
            String key = row.getField(0) + "|" + row.getField(1);
            String value = row.getField(2) + "|" + row.getField(3);
            switch (row.getKind()) {
                case INSERT: {
                    Assertions.assertThat((boolean)this.valueMap.containsKey(key)).isFalse();
                    Assertions.assertThat((!this.kindMap.containsKey(key) || this.kindMap.get(key) == RowKind.DELETE ? 1 : 0) != 0).isTrue();
                    this.valueMap.put(key, value);
                    break;
                }
                case UPDATE_AFTER: {
                    Assertions.assertThat((boolean)this.valueMap.containsKey(key)).isFalse();
                    Assertions.assertThat((Comparable)((Comparable)this.kindMap.get(key))).isEqualTo((Object)RowKind.UPDATE_BEFORE);
                    this.valueMap.put(key, value);
                    break;
                }
                case UPDATE_BEFORE: 
                case DELETE: {
                    Assertions.assertThat((String)this.valueMap.get(key)).isEqualTo(value);
                    Assertions.assertThat((this.kindMap.get(key) == RowKind.INSERT || this.kindMap.get(key) == RowKind.UPDATE_AFTER ? 1 : 0) != 0).isTrue();
                    this.valueMap.remove(key);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unknown row kind " + row.getKind());
                }
            }
            this.kindMap.put(key, row.getKind());
        }

        private void assertResult(int numProducers) {
            Assertions.assertThat((int)this.valueMap.size()).isEqualTo(256 * numProducers);
            for (int i = 0; i < 4; ++i) {
                for (int j = 0; j < 64 * numProducers; ++j) {
                    String key = i + "|" + j;
                    int x = 10000 + i * 64 + j % 64;
                    String expectedValue = x + "|" + x + ".str";
                    Assertions.assertThat((String)this.valueMap.get(key)).isEqualTo(expectedValue);
                }
            }
        }
    }
}

