/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkTableFactory;
import org.apache.paimon.flink.FlinkTestBase;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

public class ReadWriteTableITCase
extends AbstractTestBase {
    private final Map<String, String> streamingReadOverwrite = Collections.singletonMap(CoreOptions.STREAMING_READ_OVERWRITE.key(), "true");
    private final Map<String, String> staticPartitionOverwrite = Collections.singletonMap(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
    @TempDir
    public static java.nio.file.Path externalPath1;

    @BeforeEach
    public void setUp() {
        ReadWriteTableTestUtil.init(this.getTempDirPath());
    }

    @Test
    public void testBatchReadWriteWithPartitionedRecordsWithPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}));
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Arrays.asList("currency", "dt"), Collections.emptyList(), Collections.singletonList("dt"));
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 114, '2022-01-01')", "('Yen', 1, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.checkFileStorePath(table, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(table), initialRecords);
        ReadWriteTableTestUtil.insertOverwritePartition(table, "PARTITION (dt = '2022-01-02')", "('Euro', 100)", "('Yen', 1)");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt IN ('2022-01-02')"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 100L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-02"})));
        List<Row> expectedPartitionRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01"}));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt <> '2022-01-02'"), expectedPartitionRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt IN ('2022-01-01')"), expectedPartitionRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE rate >= 100"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 100L, "2022-01-02"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt = '2022-01-02' AND rate >= 100"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 100L, "2022-01-02"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "dt", ""), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-02"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "dt, currency, rate", ""), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01", "US Dollar", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01", "Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01", "Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-02", "Euro", 100L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-02", "Yen", 1L})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "currency, dt", "WHERE rate = 114"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "2022-01-01"})));
    }

    @Test
    public void testBatchReadWriteWithPartitionedRecordsWithPkWithExternalPathRoundRobinStrategy() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "file://" + externalPath1.toString());
        options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "ROUND-ROBIN");
        this.checkExternalPathTestResult(options, externalPath1.toString());
    }

    @Test
    public void testBatchReadWriteWithPartitionedRecordsWithPkWithExternalPathSpecificFStrategy() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "file://" + externalPath1.toString());
        options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "specific-fs");
        options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS.key(), "file");
        this.checkExternalPathTestResult(options, externalPath1.toString());
    }

    public void checkExternalPathTestResult(Map<String, String> options, String externalPath) throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}));
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Arrays.asList("currency", "dt"), Collections.emptyList(), Collections.singletonList("dt"), options);
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 114, '2022-01-01')", "('Yen', 1, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.checkExternalFileStorePath(Arrays.asList("dt=2022-01-01", "dt=2022-01-02"), externalPath);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(table), initialRecords);
        ReadWriteTableTestUtil.insertOverwritePartition(table, "PARTITION (dt = '2022-01-02')", "('Euro', 100)", "('Yen', 1)");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt IN ('2022-01-02')"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 100L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-02"})));
        List<Row> expectedPartitionRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01"}));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt <> '2022-01-02'"), expectedPartitionRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt IN ('2022-01-01')"), expectedPartitionRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE rate >= 100"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 100L, "2022-01-02"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt = '2022-01-02' AND rate >= 100"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 100L, "2022-01-02"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "dt", ""), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-02"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "dt, currency, rate", ""), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01", "US Dollar", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01", "Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-01", "Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-02", "Euro", 100L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-02", "Yen", 1L})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "currency, dt", "WHERE rate = 114"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "2022-01-01"})));
    }

    @Test
    public void testNaNType() throws Exception {
        ReadWriteTableTestUtil.bEnv.executeSql("CREATE TEMPORARY TABLE S ( a DOUBLE,b DOUBLE,c STRING) WITH ( 'connector' = 'filesystem', 'format'='json' , 'path' ='" + ReadWriteTableTestUtil.warehouse + "/S' )");
        ReadWriteTableTestUtil.bEnv.executeSql("INSERT INTO S VALUES (1.0,2.0,'a'),\n(0.0,0.0,'b'),\n(1.0,1.0,'c'),\n(0.0,0.0,'d'),\n(1.0,0.0,'e'),\n(0.0,0.0,'f'),\n(-1.0,0.0,'g'),\n(1.0,-1.0,'h'),\n(1.0,-2.0,'i')").await();
        ReadWriteTableTestUtil.bEnv.executeSql("CREATE TABLE T (d STRING, e DOUBLE)");
        ReadWriteTableTestUtil.bEnv.executeSql("INSERT INTO T SELECT c,a/b FROM S").await();
        BlockingIterator iterator = BlockingIterator.of((Iterator)ReadWriteTableTestUtil.bEnv.executeSql("SELECT * FROM  T").collect());
        Assertions.assertThat((List)iterator.collect(9)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"a", 0.5}), Row.of((Object[])new Object[]{"b", Double.NaN}), Row.of((Object[])new Object[]{"c", 1.0}), Row.of((Object[])new Object[]{"d", Double.NaN}), Row.of((Object[])new Object[]{"e", Double.POSITIVE_INFINITY}), Row.of((Object[])new Object[]{"f", Double.NaN}), Row.of((Object[])new Object[]{"g", Double.NEGATIVE_INFINITY}), Row.of((Object[])new Object[]{"h", -1.0}), Row.of((Object[])new Object[]{"i", -0.5})});
    }

    @Test
    public void testBatchReadWriteWithPartitionedRecordsWithoutPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}));
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.singletonList("currency"), Collections.singletonList("dt"));
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 102, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('Yen', 1, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('US Dollar', 114, '2022-01-01')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.checkFileStorePath(table, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(table), initialRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt >= '2022-01-01'"), initialRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE currency = 'US Dollar'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt = '2022-01-01' OR rate > 115"), initialRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "currency", ""), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "currency, dt", "WHERE rate = 119"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", "2022-01-02"})));
    }

    @Test
    public void testBatchReadWriteWithNonPartitionedRecordsWithPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}));
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), Collections.emptyList());
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 102)", "('Yen', 1)", "('Euro', 119)");
        ReadWriteTableTestUtil.checkFileStorePath(table, Collections.emptyList());
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", ""), initialRecords);
        ReadWriteTableTestUtil.insertOverwrite(table, "('Euro', 100)");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(table), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 100L})));
        ReadWriteTableTestUtil.insertOverwrite(table, "('US Dollar', 102)", "('Yen', 1)", "('Euro', 119)");
        List<Row> expectedFieldRecords = Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE currency = 'Euro'"), expectedFieldRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE rate > 102 AND rate <= 119"), expectedFieldRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "currency", ""), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "rate", "WHERE currency IN ('Yen')"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L})));
    }

    @Test
    public void testBatchReadWriteWithNonPartitionedRecordsWithoutPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}));
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.singletonList("currency"), Collections.emptyList());
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 102)", "('Euro', 114)", "('Yen', 1)", "('Euro', 114)", "('Euro', 119)");
        ReadWriteTableTestUtil.checkFileStorePath(table, Collections.emptyList());
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(table), initialRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE rate >= 1"), initialRecords);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE currency = 'Euro'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "currency", ""), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "currency", "WHERE rate > 100 OR currency = 'Yen'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"})));
    }

    @Test
    public void testStreamingReadWriteWithPartitionedRecordsWithPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), initialRecords, "dt:2022-01-01;dt:2022-01-02", false, "I,UA,D");
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.emptyList(), Collections.singletonList("dt"));
        ReadWriteTableTestUtil.insertIntoFromTable(temporaryTable, table);
        ReadWriteTableTestUtil.checkFileStorePath(table, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(table), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE dt < '2022-01-02'"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE rate = 102"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE rate = 102 OR dt < '2022-01-02'"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(table, "currency", "WHERE rate = 102 OR dt < '2022-01-02'"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"}))).close();
    }

    @Test
    void testStreamingReadWriteWithNonPartitionedRecordsWithPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), initialRecords, null, false, "I,UA,D");
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), Collections.emptyList());
        ReadWriteTableTestUtil.insertIntoFromTable(temporaryTable, table);
        ReadWriteTableTestUtil.checkFileStorePath(table, Collections.emptyList());
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(table), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE currency = 'Yen'"), Collections.emptyList()).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(table, "currency", ""), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(table, "currency", "WHERE rate = 102"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"}))).close();
    }

    @Test
    public void testDynamicOverwrite() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("pk INT", "part0 INT", "part1 STRING", "v STRING"), Arrays.asList("pk", "part0", "part1"), Collections.emptyList(), Arrays.asList("part0", "part1"), this.streamingReadOverwrite);
        ReadWriteTableTestUtil.insertInto(table, "(1, 1, 'A', 'Hi')", "(2, 1, 'A', 'Hello')", "(3, 1, 'A', 'World')", "(4, 1, 'B', 'To')", "(5, 1, 'B', 'Apache')", "(6, 1, 'B', 'Paimon')", "(7, 2, 'A', 'Test')", "(8, 2, 'B', 'Case')");
        BlockingIterator<Row, Row> streamItr = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(table), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, 1, "A", "Hi"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, 1, "A", "Hello"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3, 1, "A", "World"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{4, 1, "B", "To"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, 1, "B", "Apache"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6, 1, "B", "Paimon"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7, 2, "A", "Test"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, 2, "B", "Case"})));
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("INSERT OVERWRITE `%s` VALUES (4, 1, 'B', 'Where'), (5, 1, 'B', 'When'), (10, 2, 'A', 'Static'), (11, 2, 'A', 'Dynamic')", table)).await();
        Assertions.assertThat((List)streamItr.collect(8)).containsExactlyInAnyOrder((Object[])new Row[]{TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{4, 1, "B", "To"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{5, 1, "B", "Apache"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{6, 1, "B", "Paimon"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{7, 2, "A", "Test"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{4, 1, "B", "Where"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, 1, "B", "When"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, 2, "A", "Static"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{11, 2, "A", "Dynamic"})});
        ReadWriteTableTestUtil.assertNoMoreRecords(streamItr);
        streamItr.close();
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(table), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, 1, "A", "Hi"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, 1, "A", "Hello"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3, 1, "A", "World"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{4, 1, "B", "Where"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, 1, "B", "When"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, 2, "A", "Static"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{11, 2, "A", "Dynamic"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, 2, "B", "Case"})));
    }

    @Test
    public void testPurgeTableUsingBatchOverWrite() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("k0 INT", "k1 STRING", "v STRING"), Collections.emptyList(), Collections.singletonList("k0"), Collections.emptyList(), this.staticPartitionOverwrite);
        this.validatePurgingResult(table, "", "*", Collections.emptyList());
    }

    @Test
    public void testPurgePartitionUsingBatchOverWrite() throws Exception {
        List<String> fieldsSpec = Arrays.asList("k0 INT", "k1 STRING", "v STRING");
        String table = ReadWriteTableTestUtil.createTable(fieldsSpec, Collections.emptyList(), Collections.singletonList("k1"), Collections.singletonList("k0"), this.staticPartitionOverwrite);
        this.validatePurgingResult(table, "PARTITION (k0 = 0)", "k1, v", Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-02", "store"})));
        table = ReadWriteTableTestUtil.createTable(fieldsSpec, Collections.emptyList(), Collections.singletonList("v"), Arrays.asList("k0", "k1"), this.staticPartitionOverwrite);
        this.validatePurgingResult(table, "PARTITION (k0 = 0)", "k1, v", Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-02", "store"})));
        table = ReadWriteTableTestUtil.createTable(fieldsSpec, Collections.emptyList(), Collections.singletonList("v"), Arrays.asList("k0", "k1"), this.staticPartitionOverwrite);
        this.validatePurgingResult(table, "PARTITION (k0 = 0, k1 = '2023-01-01')", "v", Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{0, "2023-01-02", "world"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-02", "store"})));
    }

    @Test
    public void testStreamingReadOverwriteWithPartitionedRecords() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Arrays.asList("currency", "dt"), Collections.emptyList(), Collections.singletonList("dt"), this.streamingReadOverwrite);
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 114, '2022-01-01')", "('Yen', 1, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.checkFileStorePath(table, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        ReadWriteTableTestUtil.insertOverwritePartition(table, "PARTITION (dt = '2022-01-01')", "('US Dollar', 120)");
        BlockingIterator<Row, Row> streamingItr = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(table), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 120L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"})));
        ReadWriteTableTestUtil.insertOverwritePartition(table, "PARTITION (dt = '2022-01-02')", "('Euro', 100)", "('Yen', 1)");
        ReadWriteTableTestUtil.validateStreamingReadResult(streamingItr, Arrays.asList(TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 100L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-02"})));
        streamingItr.close();
    }

    @Test
    public void testStreamingReadOverwriteWithoutPartitionedRecords() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.singletonList("currency"), Collections.emptyList(), Collections.emptyList(), this.streamingReadOverwrite);
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 102, '2022-01-01')", "('Yen', 1, '2022-01-02')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.checkFileStorePath(table, Collections.emptyList());
        BlockingIterator<Row, Row> streamingItr = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(table, "currency, rate", "WHERE dt = '2022-01-02'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L})));
        ReadWriteTableTestUtil.insertOverwrite(table, "('US Dollar', 100, '2022-01-02')", "('Yen', 10, '2022-01-01')");
        ReadWriteTableTestUtil.validateStreamingReadResult(streamingItr, Arrays.asList(TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 100L})));
        streamingItr.close();
    }

    @Test
    public void testStreamingReadOverwriteWithDeleteRecords() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.singletonList("currency"), Collections.emptyList(), Collections.emptyList(), this.streamingReadOverwrite);
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 102, '2022-01-01')", "('Yen', 1, '2022-01-02')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("DELETE FROM %s WHERE currency = 'Euro'", table)).await();
        ReadWriteTableTestUtil.checkFileStorePath(table, Collections.emptyList());
        BlockingIterator<Row, Row> streamingItr = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(table, "currency, rate", "WHERE dt = '2022-01-02'"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L})));
        ReadWriteTableTestUtil.insertOverwrite(table, "('US Dollar', 100, '2022-01-02')", "('Yen', 10, '2022-01-01')");
        ReadWriteTableTestUtil.validateStreamingReadResult(streamingItr, Arrays.asList(TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 100L})));
        ReadWriteTableTestUtil.assertNoMoreRecords(streamingItr);
        streamingItr.close();
    }

    @Test
    public void testUnsupportStreamingReadOverwriteWithoutPk() {
        Assertions.assertThatThrownBy(() -> ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.singletonList("currency"), Collections.singletonList("dt"), this.streamingReadOverwrite)).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(RuntimeException.class, (String)"Doesn't support streaming read the changes from overwrite when the primary keys are not defined.")});
    }

    @Test
    public void testLike() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("f0 INT", "f1 STRING"), Collections.emptyList(), Collections.singletonList("f0"), Collections.emptyList());
        ReadWriteTableTestUtil.insertInto(table, "(1, 'test_1')", "(2, 'test_2')", "(1, 'test_%')", "(2, 'test%2')", "(3, 'university')", "(4, 'very')", "(5, 'yield')");
        ReadWriteTableTestUtil.insertInto(table, "(7, 'villa')", "(8, 'tests')", "(20, 'test_123')", "(9, 'valley')", "(10, 'tested')", "(100, 'test%fff')");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE f1 LIKE 'test%'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "test_1"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "test_2"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "test_%"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "test%2"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "tests"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, "tested"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{20, "test_123"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{100, "test%fff"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE f1 LIKE 'v%'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{4, "very"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7, "villa"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{9, "valley"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE f1 LIKE 'test=_%' ESCAPE '='"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "test_1"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "test_2"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "test_%"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{20, "test_123"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE f1 LIKE 'test=__' ESCAPE '='"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "test_1"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "test_2"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "test_%"})));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE f1 LIKE 'test$%%' ESCAPE '$'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "test%2"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{100, "test%fff"})));
    }

    @Test
    public void testIn() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "aaa"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "bbb"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3, "ccc"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{4, "ddd"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, "eee"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6, "aaa"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7, "bbb"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "ccc"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{9, "ddd"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, "eee"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{11, "aaa"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{12, "bbb"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{13, "ccc"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{14, "ddd"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{15, "eee"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{16, "aaa"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{17, "bbb"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{18, "ccc"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{19, "ddd"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{20, "eee"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{21, "fff"}));
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("f0 INT", "f1 STRING"), Collections.emptyList(), Collections.singletonList("f0"), Collections.emptyList());
        ReadWriteTableTestUtil.insertInto(table, "(1, 'aaa')", "(2, 'bbb')", "(3, 'ccc')", "(4, 'ddd')", "(5, 'eee')", "(6, 'aaa')", "(7, 'bbb')", "(8, 'ccc')", "(9, 'ddd')", "(10, 'eee')", "(11, 'aaa')", "(12, 'bbb')", "(13, 'ccc')", "(14, 'ddd')", "(15, 'eee')", "(16, 'aaa')", "(17, 'bbb')", "(18, 'ccc')", "(19, 'ddd')", "(20, 'eee')", "(21, 'fff')");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE f0 IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21)"), initialRecords);
        ArrayList<Row> expected = new ArrayList<Row>(initialRecords);
        expected.remove(20);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE f1 IN ('aaa', 'bbb', 'ccc', 'ddd', 'eee')"), expected);
    }

    @Test
    public void testUnsupportedPredicate() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.emptyList(), Collections.singletonList("dt"));
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 102, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('Yen', 1, '2022-01-01')", "('Euro', 114, '2022-01-01')", "('US Dollar', 114, '2022-01-01')", "('Euro', 119, '2022-01-02')");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildQuery(table, "*", "WHERE currency SIMILAR TO 'Euro'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"})));
    }

    @Test
    public void testSourceParallelism() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}));
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.singletonList("currency"), Collections.emptyList(), Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "false"));
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 102)", "('Euro', 114)", "('Yen', 1)", "('Euro', 114)", "('Euro', 119)");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(table), initialRecords);
        Assertions.assertThat((int)this.sourceParallelism(ReadWriteTableTestUtil.buildSimpleQuery(table))).isEqualTo(ReadWriteTableTestUtil.bExeEnv.getParallelism());
        Assertions.assertThat((int)this.sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "", (Map<String, String>)new HashMap<String, String>(){
            {
                this.put(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "false");
                this.put(FlinkConnectorOptions.SCAN_PARALLELISM.key(), "66");
            }
        }))).isEqualTo(66);
    }

    @Test
    void testConvertRowType2Serializer() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)EnvironmentSettings.newInstance().inBatchMode().build());
        tEnv.executeSql("CREATE CATALOG my_catalog WITH (\n    'type' = 'paimon',\n    'warehouse' = '" + this.getTempDirPath() + "'\n)");
        tEnv.executeSql("USE CATALOG my_catalog");
        tEnv.executeSql("CREATE TABLE tmp (\nexecution\nROW<`execution_server` STRING, `execution_insertion` ARRAY<ROW<`platform_id` BIGINT, `user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY<ROW<`key` STRING, `value` ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT>> NOT NULL>, `model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `backoff_predictors` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY<ROW<`force_step` ROW<`reason` STRING>, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY<FLOAT>, `experiments` ARRAY<ROW<`experiment_ref` INT, `score` FLOAT> NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY<ROW<`method` STRING, `start_millis` BIGINT, `duration_millis` INT> NOT NULL>, `execution_stats` ROW<`stages` ARRAY<ROW<`key` INT, `value` ROW<`stats` ARRAY<ROW<`key` INT, `value` BIGINT> NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `model_ref` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY<ROW<`key` STRING, `value` ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT>> NOT NULL>, `model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `backoff_predictors` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY<ROW<`key` BIGINT, `value` ROW<`bucket` INT, `value` FLOAT>> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY<STRING>, `ids` ARRAY<STRING>, `variable_logs` ARRAY<ROW<`name` STRING, `values` ARRAY<FLOAT>> NOT NULL>, `allocation_logs` ARRAY<ROW<`indexes` ARRAY<INT>, `name` STRING, `positions_considered` ARRAY<INT>, `positions_filled` ARRAY<INT>> NOT NULL>>, `experiments` ARRAY<ROW<`name` STRING, `cohort_arm` INT> NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>);");
        Assertions.assertThatCode(() -> tEnv.executeSql("INSERT INTO tmp VALUES (CAST(NULL AS ROW<`execution_server` STRING, `execution_insertion` ARRAY<ROW<`platform_id` BIGINT, `user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN, `struct_value` ROW<`fields` ARRAY<ROW<`key` STRING, `value` ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN>> NOT NULL>>, `list_value` ROW<`values` ARRAY<ROW<`null_value` STRING, `number_value` DOUBLE, `string_value` STRING, `bool_value` BOOLEAN> NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY<ROW<`key` STRING, `value` ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT>> NOT NULL>, `model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `backoff_predictors` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY<ROW<`force_step` ROW<`reason` STRING>, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY<FLOAT>, `experiments` ARRAY<ROW<`experiment_ref` INT, `score` FLOAT> NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY<ROW<`method` STRING, `start_millis` BIGINT, `duration_millis` INT> NOT NULL>, `execution_stats` ROW<`stages` ARRAY<ROW<`key` INT, `value` ROW<`stats` ARRAY<ROW<`key` INT, `value` BIGINT> NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY<ROW<`key` INT, `value` FLOAT> NOT NULL>, `categorical` ARRAY<ROW<`key` INT, `value` STRING> NOT NULL>, `sparse` ARRAY<ROW<`key` BIGINT, `value` FLOAT> NOT NULL>, `sparse_id` ARRAY<ROW<`key` BIGINT, `value` BIGINT> NOT NULL>, `embeddings` ARRAY<ROW<`key` BIGINT, `value` ROW<`embeddings` ARRAY<FLOAT>>> NOT NULL>, `feature_references` ARRAY<ROW<`type` STRING, `key` STRING, `version` STRING, `timestamp` BIGINT> NOT NULL>, `sparse_id_list` ARRAY<ROW<`key` BIGINT, `value` ROW<`ids` ARRAY<BIGINT>>> NOT NULL>, `user_events` ROW<`user_events` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>, `user_events_all` ARRAY<ROW<`event_type` BIGINT, `content_id` STRING, `timestamp` BIGINT, `custom_event_type` STRING> NOT NULL>>, `string_features` ARRAY<ROW<`key` BIGINT, `value` STRING> NOT NULL>>>, `model_ref` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY<ROW<`key` STRING, `value` ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT>> NOT NULL>, `model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY<ROW<`score` FLOAT, `score_source` STRING, `model_type` STRING, `model_id` STRING, `model_index` INT> NOT NULL>, `backoff_predictors` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>, `models` ARRAY<ROW<`model_id` STRING, `model_type` STRING, `prediction_type` STRING, `name` STRING, `feature_id` BIGINT, `is_assigned` BOOLEAN> NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY<ROW<`key` BIGINT, `value` ROW<`bucket` INT, `value` FLOAT>> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY<STRING>, `ids` ARRAY<STRING>, `variable_logs` ARRAY<ROW<`name` STRING, `values` ARRAY<FLOAT>> NOT NULL>, `allocation_logs` ARRAY<ROW<`indexes` ARRAY<INT>, `name` STRING, `positions_considered` ARRAY<INT>, `positions_filled` ARRAY<INT>> NOT NULL>>, `experiments` ARRAY<ROW<`name` STRING, `cohort_arm` INT> NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>))")).doesNotThrowAnyException();
    }

    @Test
    public void testInferParallelism() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.singletonList("currency"), Collections.emptyList(), (Map<String, String>)new HashMap<String, String>(){
            {
                this.put(CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST.key(), "1KB");
                this.put(CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(), "1KB");
                this.put(CoreOptions.BUCKET.key(), "2");
            }
        });
        Assertions.assertThat((int)this.sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "", Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true")))).isEqualTo(1);
        Assertions.assertThat((int)this.sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "", (Map<String, String>)new HashMap<String, String>(){
            {
                this.put(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true");
                this.put(FlinkConnectorOptions.SCAN_PARALLELISM.key(), "3");
            }
        }))).isEqualTo(3);
        Assertions.assertThatThrownBy(() -> this.sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "", (Map<String, String>)new HashMap<String, String>(){
            {
                this.put(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true");
                this.put(FlinkConnectorOptions.SCAN_PARALLELISM.key(), "-2");
            }
        }))).hasMessageContaining("The parallelism of an operator must be at least 1");
        ReadWriteTableTestUtil.insertInto(table, "('Euro', 119)");
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 102)");
        Assertions.assertThat((int)this.sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "", Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true")))).isEqualTo(2);
        Assertions.assertThat((int)this.sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "WHERE currency='Euro'", Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true")))).isEqualTo(1);
        Assertions.assertThat((int)this.sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "", 1L, Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true")))).isEqualTo(1);
        Assertions.assertThat((int)this.sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "", 3L, Collections.singletonMap(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true")))).isEqualTo(1);
        Assertions.assertThat((int)this.sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "", (Map<String, String>)new HashMap<String, String>(){
            {
                this.put(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "false");
                this.put(FlinkConnectorOptions.SCAN_PARALLELISM.key(), "3");
            }
        }))).isEqualTo(3);
        Assertions.assertThat((int)this.sourceParallelism(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "", (Map<String, String>)new HashMap<String, String>(){
            {
                this.put(FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key(), "true");
                this.put(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM.key(), "1");
            }
        }))).isEqualTo(1);
        Assertions.assertThat((int)this.sourceParallelismStreaming(ReadWriteTableTestUtil.buildQueryWithTableOptions(table, "*", "", new HashMap<String, String>()))).isEqualTo(2);
    }

    @ParameterizedTest
    @MethodSource(value={"testSinkParallelismParameters"})
    public void testSinkParallelism(boolean isFixedBucket, boolean hasPrimaryKey, boolean isSinkParallelismSet) throws Exception {
        this.testSinkParallelism(isFixedBucket, hasPrimaryKey, isSinkParallelismSet ? Integer.valueOf(23) : null);
    }

    @Test
    public void testChangeBucketNumber() throws Exception {
        String table = "MyTable_" + UUID.randomUUID();
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("CREATE TABLE `%s` (\ncurrency STRING,\n rate BIGINT,\n dt STRING\n) PARTITIONED BY (dt)\nWITH (\n 'bucket' = '2',\n 'bucket-key' = 'currency'\n)", table));
        ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 102, '2022-06-20')");
        this.assertChangeBucketWithoutRescale(table, 3);
        this.assertChangeBucketWithoutRescale(table, 1);
    }

    @Test
    public void testStreamingInsertOverwrite() {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.singletonList("currency"), Collections.singletonList("dt"));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ReadWriteTableTestUtil.sEnv.executeSql(String.format("INSERT OVERWRITE `%s` VALUES('US Dollar', 102, '2022-06-20')", table))).isInstanceOf(UnsupportedOperationException.class)).hasMessage("Paimon doesn't support streaming INSERT OVERWRITE.");
    }

    @Test
    public void testPhysicalColumnComments() {
        String ddl = "CREATE TABLE T(a INT COMMENT 'comment of a', b INT);";
        ReadWriteTableTestUtil.bEnv.executeSql(ddl);
        List result = CollectionUtil.iteratorToList((Iterator)ReadWriteTableTestUtil.bEnv.executeSql("DESC T").collect()).stream().map(Objects::toString).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new String[]{"+I[a, INT, true, null, null, null, comment of a]", "+I[b, INT, true, null, null, null, null]"});
    }

    @Test
    public void testComputedColumnComments() {
        String ddl = "CREATE TABLE T(a INT , b INT, c AS a + b COMMENT 'computed');";
        ReadWriteTableTestUtil.bEnv.executeSql(ddl);
        List result = CollectionUtil.iteratorToList((Iterator)ReadWriteTableTestUtil.bEnv.executeSql("DESC T").collect()).stream().map(Objects::toString).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new String[]{"+I[a, INT, true, null, null, null, null]", "+I[b, INT, true, null, null, null, null]", "+I[c, INT, true, null, AS `a` + `b`, null, computed]"});
    }

    @Test
    public void testCleanedSchemaOptions() {
        String ddl = "CREATE TABLE T (\nid INT,\nprice INT,\nrecord_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,\ncomp AS price * 2,\norder_time TIMESTAMP(3),\nWATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,\nPRIMARY KEY (id) NOT ENFORCED\n);";
        ReadWriteTableTestUtil.bEnv.executeSql(ddl);
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), new Path(ReadWriteTableTestUtil.warehouse, "default.db/T"));
        TableSchema schema = (TableSchema)schemaManager.latest().get();
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("schema.2.name", "record_time");
        expected.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL TIME ZONE");
        expected.put("schema.2.metadata", "timestamp");
        expected.put("schema.2.virtual", "true");
        expected.put("schema.3.name", "comp");
        expected.put("schema.3.data-type", "INT");
        expected.put("schema.3.expr", "`price` * 2");
        expected.put("schema.watermark.0.rowtime", "order_time");
        expected.put("schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND");
        expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
        Assertions.assertThat((Map)schema.options()).containsExactlyInAnyOrderEntriesOf(expected);
        this.validateSchemaOptionResult();
    }

    @Test
    public void testReadFromOldStyleSchemaOptions() throws Exception {
        HashMap<String, String> oldStyleOptions = new HashMap<String, String>();
        oldStyleOptions.put("schema.0.name", "id");
        oldStyleOptions.put("schema.0.data-type", "INT NOT NULL");
        oldStyleOptions.put("schema.1.name", "price");
        oldStyleOptions.put("schema.1.data-type", "INT");
        oldStyleOptions.put("schema.2.name", "record_time");
        oldStyleOptions.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL TIME ZONE");
        oldStyleOptions.put("schema.2.metadata", "timestamp");
        oldStyleOptions.put("schema.2.virtual", "true");
        oldStyleOptions.put("schema.3.name", "comp");
        oldStyleOptions.put("schema.3.data-type", "INT");
        oldStyleOptions.put("schema.3.expr", "`price` * 2");
        oldStyleOptions.put("schema.4.name", "order_time");
        oldStyleOptions.put("schema.4.data-type", "TIMESTAMP(3)");
        oldStyleOptions.put("schema.watermark.0.rowtime", "order_time");
        oldStyleOptions.put("schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND");
        oldStyleOptions.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
        oldStyleOptions.put("schema.primary-key.name", "constrain_pk");
        oldStyleOptions.put("schema.primary-key.columns", "id");
        Schema schema = Schema.newBuilder().column("id", DataTypes.INT().notNull()).column("price", (DataType)DataTypes.INT()).column("order_time", (DataType)DataTypes.TIMESTAMP((int)3)).options(oldStyleOptions).primaryKey(new String[]{"id"}).build();
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), new Path(ReadWriteTableTestUtil.warehouse, "default.db/T"));
        schemaManager.createTable(schema);
        this.validateSchemaOptionResult();
    }

    @ParameterizedTest
    @ValueSource(strings={"deduplicate", "partial-update"})
    public void testUpdateWithPrimaryKey(String mergeEngine) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.MERGE_ENGINE.key(), mergeEngine);
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Arrays.asList("id", "dt"), Collections.emptyList(), Collections.singletonList("dt"), options);
        ReadWriteTableTestUtil.insertInto(table, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'UNKNOWN', -1, '2022-01-01')", "(3, 'Euro', 114, '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')");
        String updateStatement = String.format("UPDATE %s SET currency = 'Yen', rate = 1 WHERE currency = 'UNKNOWN' and dt = '2022-01-01'", table);
        ReadWriteTableTestUtil.bEnv.executeSql(updateStatement).await();
        String querySql = String.format("SELECT * FROM %s", table);
        String rowKind = mergeEngine.equals("deduplicate") ? "+U" : "+I";
        ReadWriteTableTestUtil.testBatchRead(querySql, Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L, "US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)rowKind, (Object[])new Object[]{2L, "Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3L, "Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3L, "Euro", 119L, "2022-01-02"})));
    }

    @Test
    public void testUpdateWithoutPrimaryKey() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.singletonList("id"), Collections.singletonList("dt"), options);
        ReadWriteTableTestUtil.insertInto(table, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'UNKNOWN', -1, '2022-01-01')", "(3, 'Euro', 114, '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')");
        String updateStatement = String.format("UPDATE %s SET currency = 'Yen', rate = 1 WHERE currency = 'UNKNOWN' and dt = '2022-01-01'", table);
        Assertions.assertThatThrownBy(() -> ReadWriteTableTestUtil.bEnv.executeSql(updateStatement).await()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class)});
    }

    @Test
    public void testDeleteWithPrimaryKey() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Arrays.asList("id", "dt"), Collections.emptyList(), Collections.singletonList("dt"), Collections.emptyMap());
        ReadWriteTableTestUtil.insertInto(table, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'UNKNOWN', -1, '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')");
        String deleteStatement = String.format("DELETE FROM %s WHERE currency = 'UNKNOWN'", table);
        ReadWriteTableTestUtil.bEnv.executeSql(deleteStatement).await();
        String querySql = String.format("SELECT * FROM %s", table);
        ReadWriteTableTestUtil.testBatchRead(querySql, Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L, "US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3L, "Euro", 119L, "2022-01-02"})));
    }

    @Test
    public void testDeleteWithoutPrimaryKey() throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Collections.emptyList(), Collections.singletonList("id"), Collections.singletonList("dt"), options);
        ReadWriteTableTestUtil.insertInto(table, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'UNKNOWN', -1, '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')");
        String deleteStatement = String.format("DELETE FROM %s WHERE currency = 'UNKNOWN'", table);
        Assertions.assertThatThrownBy(() -> ReadWriteTableTestUtil.bEnv.executeSql(deleteStatement).await()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class)});
    }

    @Test
    public void testDeleteWithPrimaryKeyFilter() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String"), Arrays.asList("id", "dt"), Collections.emptyList(), Collections.singletonList("dt"), Collections.emptyMap());
        ReadWriteTableTestUtil.insertInto(table, "(1, 'US Dollar', 114, '2022-01-01')", "(2, 'UNKNOWN', -1, '2022-01-01')", "(3, 'Euro', 119, '2022-01-02')", "(4, 'CNY', 119, '2022-01-02')", "(5, 'HKD', 119, '2022-01-03')", "(6, 'CAD', 119, '2022-01-03')", "(7, 'INR', 119, '2022-01-03')", "(8, 'MOP', 119, '2022-01-03')");
        String deleteStatement = String.format("DELETE FROM %s WHERE id = 2 and dt = '2022-01-01'", table);
        List<Row> expectedRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L, "US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3L, "Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{4L, "CNY", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5L, "HKD", 119L, "2022-01-03"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6L, "CAD", 119L, "2022-01-03"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7L, "INR", 119L, "2022-01-03"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8L, "MOP", 119L, "2022-01-03"}));
        ReadWriteTableTestUtil.bEnv.executeSql(deleteStatement).await();
        String querySql = String.format("SELECT * FROM %s", table);
        ReadWriteTableTestUtil.testBatchRead(querySql, expectedRecords);
        String deleteStatement2 = String.format("DELETE FROM %s", table);
        ReadWriteTableTestUtil.bEnv.executeSql(deleteStatement2).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", table), Collections.emptyList());
        String deleteStatement3 = String.format("DELETE FROM %s WHERE dt = '2022-01-03'", table);
        ReadWriteTableTestUtil.bEnv.executeSql(deleteStatement3).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", table), Collections.emptyList());
    }

    @Test
    public void testDeletePushDownWithPartitionKey() throws Exception {
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("id BIGINT NOT NULL", "currency STRING", "rate BIGINT", "dt String", "hh String"), Arrays.asList("id", "dt", "hh"), Collections.emptyList(), Arrays.asList("dt", "hh"), Collections.emptyMap());
        ReadWriteTableTestUtil.insertInto(table, "(1, 'US Dollar', 114, '2022-01-01', '11')", "(2, 'UNKNOWN', -1, '2022-01-01', '12')", "(3, 'Euro', 119, '2022-01-02', '13')", "(4, 'CNY', 119, '2022-01-03', '14')", "(5, 'HKD', 119, '2022-01-03', '15')", "(6, 'CAD', 119, '2022-01-03', '16')", "(7, 'INR', 119, '2022-01-03', '17')", "(8, 'MOP', 119, '2022-01-03', '18')");
        String deleteStatement = String.format("DELETE FROM %s WHERE dt = '2022-01-03' AND currency = 'CNY'", table);
        List<Row> expectedRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L, "US Dollar", 114L, "2022-01-01", "11"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2L, "UNKNOWN", -1L, "2022-01-01", "12"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3L, "Euro", 119L, "2022-01-02", "13"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5L, "HKD", 119L, "2022-01-03", "15"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6L, "CAD", 119L, "2022-01-03", "16"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7L, "INR", 119L, "2022-01-03", "17"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8L, "MOP", 119L, "2022-01-03", "18"}));
        ReadWriteTableTestUtil.bEnv.executeSql(deleteStatement).await();
        String querySql = String.format("SELECT * FROM %s", table);
        ReadWriteTableTestUtil.testBatchRead(querySql, expectedRecords);
        String deleteStatement1 = String.format("DELETE FROM %s WHERE dt = '2022-01-02' or hh = '15'", table);
        List<Row> expectedRecords1 = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L, "US Dollar", 114L, "2022-01-01", "11"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2L, "UNKNOWN", -1L, "2022-01-01", "12"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6L, "CAD", 119L, "2022-01-03", "16"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7L, "INR", 119L, "2022-01-03", "17"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8L, "MOP", 119L, "2022-01-03", "18"}));
        ReadWriteTableTestUtil.bEnv.executeSql(deleteStatement1).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", table), expectedRecords1);
        String deleteStatement2 = String.format("DELETE FROM %s WHERE dt = '2022-01-03' and hh = '16'", table);
        List<Row> expectedRecords2 = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L, "US Dollar", 114L, "2022-01-01", "11"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2L, "UNKNOWN", -1L, "2022-01-01", "12"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7L, "INR", 119L, "2022-01-03", "17"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8L, "MOP", 119L, "2022-01-03", "18"}));
        ReadWriteTableTestUtil.bEnv.executeSql(deleteStatement2).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", table), expectedRecords2);
        String deleteStatement3 = String.format("DELETE FROM %s WHERE dt = '2022-01-03'", table);
        List<Row> expectedRecords3 = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L, "US Dollar", 114L, "2022-01-01", "11"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2L, "UNKNOWN", -1L, "2022-01-01", "12"}));
        ReadWriteTableTestUtil.bEnv.executeSql(deleteStatement3).await();
        ReadWriteTableTestUtil.testBatchRead(String.format("SELECT * FROM %s", table), expectedRecords3);
    }

    private void validatePurgingResult(String table, String partitionSpec, String projectionSpec, List<Row> expected) throws Exception {
        ReadWriteTableTestUtil.insertInto(table, "(0, '2023-01-01', 'hi')", "(0, '2023-01-01', 'hello')", "(0, '2023-01-02', 'world')", "(1, '2023-01-01', 'flink')", "(1, '2023-01-02', 'table')", "(1, '2023-01-02', 'store')");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(table), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{0, "2023-01-01", "hi"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{0, "2023-01-01", "hello"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{0, "2023-01-02", "world"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-01", "flink"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-02", "table"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "2023-01-02", "store"})));
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("INSERT OVERWRITE `%s` %s SELECT %s FROM `%s` WHERE false", table, partitionSpec, projectionSpec, table)).await();
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(table), expected);
    }

    private int sourceParallelism(String sql) {
        DataStream stream = ((StreamTableEnvironment)ReadWriteTableTestUtil.bEnv).toChangelogStream(ReadWriteTableTestUtil.bEnv.sqlQuery(sql));
        return stream.getParallelism();
    }

    private int sourceParallelismStreaming(String sql) {
        DataStream stream = ((StreamTableEnvironment)ReadWriteTableTestUtil.sEnv).toChangelogStream(ReadWriteTableTestUtil.sEnv.sqlQuery(sql));
        return stream.getParallelism();
    }

    private void testSinkParallelism(boolean isFixedBucket, boolean hasPrimaryKey, Integer configParallelism) throws Exception {
        HashMap<String, String> options = new HashMap<String, String>();
        if (configParallelism != null) {
            options.put(FlinkConnectorOptions.SINK_PARALLELISM.key(), configParallelism.toString());
        }
        options.put("path", this.getTempFilePath(UUID.randomUUID().toString()));
        if (isFixedBucket) {
            options.put("bucket", "1");
            options.put("bucket-key", "a");
        }
        FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(ObjectIdentifier.of((String)"default", (String)"default", (String)"t1"), FlinkTestBase.createResolvedTable(options, RowType.of((LogicalType[])new LogicalType[]{new VarCharType(Integer.MAX_VALUE)}, (String[])new String[]{"a"}), Collections.emptyList(), hasPrimaryKey ? Collections.singletonList("a") : Collections.emptyList()), Collections.emptyMap(), (ReadableConfig)new Configuration(), Thread.currentThread().getContextClassLoader(), false);
        Path path = CoreOptions.path((Map)context.getCatalogTable().getOptions());
        LocalFileIO.create().mkdirs(path);
        new SchemaManager((FileIO)LocalFileIO.create(), path).createTable(FlinkCatalog.fromCatalogTable((CatalogBaseTable)context.getCatalogTable()));
        FlinkTableSink tableSink = new FlinkTableSink(context.getObjectIdentifier(), new FlinkTableFactory().buildPaimonTable((DynamicTableFactory.Context)context), (DynamicTableFactory.Context)context, null);
        Assertions.assertThat((Object)tableSink).isInstanceOf(FlinkTableSink.class);
        DynamicTableSink.SinkRuntimeProvider provider = tableSink.getSinkRuntimeProvider((DynamicTableSink.Context)new SinkRuntimeProviderContext(false));
        Assertions.assertThat((Object)provider).isInstanceOf(DataStreamSinkProvider.class);
        DataStreamSinkProvider sinkProvider = (DataStreamSinkProvider)provider;
        DataStreamSource mockSource = ReadWriteTableTestUtil.bExeEnv.fromCollection(Collections.singletonList(GenericRowData.of((Object[])new Object[0])));
        mockSource.getTransformation().setParallelism(mockSource.getParallelism(), false);
        DataStreamSink sink = sinkProvider.consumeDataStream(null, (DataStream)mockSource);
        boolean hasPartitionTransformation = isFixedBucket || hasPrimaryKey;
        boolean expectedIsParallelismConfigured = configParallelism != null || !isFixedBucket && hasPrimaryKey;
        Transformation transformation = sink.getTransformation();
        boolean isPartitionTransformationFound = true;
        boolean isWriterFound = false;
        while (!(transformation instanceof PartitionTransformation)) {
            if (transformation.getName().contains("Writer")) {
                isWriterFound = true;
                Assertions.assertThat((boolean)transformation.isParallelismConfigured()).isEqualTo(expectedIsParallelismConfigured);
            }
            Assertions.assertThat((int)transformation.getParallelism()).isIn(new Object[]{1, configParallelism == null ? ReadWriteTableTestUtil.bExeEnv.getParallelism() : configParallelism.intValue()});
            List inputTransformations = transformation.getInputs();
            if (inputTransformations.isEmpty()) {
                isPartitionTransformationFound = false;
                break;
            }
            transformation = (Transformation)inputTransformations.get(0);
        }
        Assertions.assertThat((boolean)isPartitionTransformationFound).isEqualTo(hasPartitionTransformation);
        Assertions.assertThat((boolean)isWriterFound).isTrue();
    }

    private static Stream<Arguments> testSinkParallelismParameters() {
        List<Boolean> allBooleans = Arrays.asList(false, true);
        ArrayList<Arguments> parameters = new ArrayList<Arguments>();
        for (boolean isFixedBucket : allBooleans) {
            for (boolean hasPrimaryKey : allBooleans) {
                for (boolean isSinkParallelismSet : allBooleans) {
                    parameters.add(Arguments.of((Object[])new Object[]{isFixedBucket, hasPrimaryKey, isSinkParallelismSet}));
                }
            }
        }
        return parameters.stream();
    }

    private void assertChangeBucketWithoutRescale(String table, int bucketNum) throws Exception {
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("ALTER TABLE `%s` SET ('bucket' = '%d')", table, bucketNum));
        Assertions.assertThat((List)BlockingIterator.of((Iterator)ReadWriteTableTestUtil.bEnv.executeSql(ReadWriteTableTestUtil.buildSimpleQuery(table)).collect()).collect()).containsExactlyInAnyOrder((Object[])new Row[]{TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-06-20"})});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ReadWriteTableTestUtil.insertInto(table, "('US Dollar', 102, '2022-06-20')")).rootCause().isInstanceOf(RuntimeException.class)).hasMessage(String.format("Try to write partition {dt=2022-06-20} with a new bucket num %d, but the previous bucket num is 2. Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", bucketNum));
    }

    private void validateSchemaOptionResult() {
        List descResults = CollectionUtil.iteratorToList((Iterator)ReadWriteTableTestUtil.bEnv.executeSql("DESC T").collect()).stream().map(Object::toString).collect(Collectors.toList());
        Assertions.assertThat(descResults).isEqualTo(Arrays.asList("+I[id, INT, false, PRI(id), null, null]", "+I[price, INT, true, null, null, null]", "+I[record_time, TIMESTAMP_LTZ(3), true, null, METADATA FROM 'timestamp' VIRTUAL, null]", "+I[comp, INT, true, null, AS `price` * 2, null]", "+I[order_time, TIMESTAMP(3), true, null, null, `order_time` - INTERVAL '5' SECOND]"));
        String showResult = ((Row)CollectionUtil.iteratorToList((Iterator)ReadWriteTableTestUtil.bEnv.executeSql("SHOW CREATE TABLE T").collect()).get(0)).toString();
        Assertions.assertThat((boolean)showResult.contains("schema.")).isFalse();
    }
}

