/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.kafka.KafkaLogTestUtils;
import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ComputedColumnAndWatermarkTableITCase
extends KafkaTableTestBase {
    @BeforeEach
    public void setUp() {
        ReadWriteTableTestUtil.init((String)this.createAndRegisterTempFile("").toString());
    }

    @Test
    public void testBatchSelectComputedColumn() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), initialRecords, null, (boolean)true, (String)"I");
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "capital_currency AS UPPER(currency)"), Collections.emptyList(), Collections.singletonList("currency"), Collections.emptyList());
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testBatchRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"capital_currency", (String)""), initialRecords.stream().map(row -> TestValuesTableFactory.changelogRow((String)row.getKind().shortString(), (Object[])new Object[]{((String)row.getField(0)).toUpperCase()})).collect(Collectors.toList()));
        table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "capital_currency AS LOWER(currency)"), Collections.singletonList("currency"), Collections.emptyList(), Collections.emptyList());
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testBatchRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"capital_currency", (String)""), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"us dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"yen"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"euro"})));
        initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01", "00"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01", "20"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02", "12"}));
        temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), initialRecords, (String)"dt:2022-01-01,hh:00;dt:2022-01-01,hh:20;dt:2022-01-02,hh:12", (boolean)true, (String)"I");
        table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING", "dth AS dt || ' ' || hh"), Arrays.asList("currency", "dt", "hh"), Collections.emptyList(), Arrays.asList("dt", "hh"));
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testBatchRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"dth", (String)"WHERE dth = '2022-01-02 12'"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"2022-01-02 12"})));
        table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING", "ptime AS PROCTIME()"), Collections.singletonList("currency"), Collections.emptyList(), Collections.emptyList());
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testBatchRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"CHAR_LENGTH(DATE_FORMAT(ptime, 'yyyy-MM-dd HH:mm'))", (String)"WHERE currency = 'US Dollar'"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{16})));
    }

    @Test
    public void testBatchSelectWithWatermark() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, LocalDateTime.parse("1990-04-07T10:00:11.120")}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, LocalDateTime.parse("2020-04-07T10:10:11.120")}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, LocalDateTime.parse("2022-04-07T09:54:11.120")}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)"), Collections.emptyList(), Collections.emptyList(), initialRecords, null, (boolean)true, (String)"I");
        String table = ReadWriteTableTestUtil.createTable(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)", "WATERMARK FOR ts AS ts - INTERVAL '3' YEAR"), Collections.emptyList(), Collections.singletonList("currency"), Collections.emptyList());
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testBatchRead((String)ReadWriteTableTestUtil.buildSimpleQuery((String)table), initialRecords);
    }

    @Test
    public void testStreamingSelectWithWatermark() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, LocalDateTime.parse("1990-04-07T10:00:11.120")}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, LocalDateTime.parse("2020-04-07T10:10:11.120")}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, LocalDateTime.parse("2022-04-07T09:54:11.120")}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)"), Collections.emptyList(), Collections.emptyList(), initialRecords, null, (boolean)true, (String)"I");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)", "WATERMARK FOR ts AS ts - INTERVAL '3' YEAR"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, LocalDateTime.parse("1990-04-07T10:00:11.120")}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)", "ts1 AS ts", "WATERMARK FOR ts1 AS ts1 - INTERVAL '3' YEAR"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"currency, rate, ts1", (String)"WHERE CURRENT_WATERMARK(ts1) IS NULL OR ts1 > CURRENT_WATERMARK(ts1)", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, LocalDateTime.parse("1990-04-07T10:00:11.120")}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "ts TIMESTAMP(3)", "ptime AS PROCTIME()", "WATERMARK FOR ts AS ts - INTERVAL '3' YEAR"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"currency, rate, ts, CHAR_LENGTH(DATE_FORMAT(ptime, 'yyyy-MM-dd HH:mm'))", (String)"WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, LocalDateTime.parse("1990-04-07T10:00:11.120"), 16}))).close();
    }
}

