/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.ExplainFormat;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class FilterPushDownITCase
extends CatalogITCaseBase {
    @Override
    public List<String> ddl() {
        return ImmutableList.of((Object)"CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (a);");
    }

    @Override
    @BeforeEach
    public void before() throws IOException {
        super.before();
        this.batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2'), (2, 3, '3'), (3, 3, '3')", new Object[0]);
    }

    @Test
    public void testPartitionConditionConsuming_OnePartitionCondition() {
        String sql = "SELECT * FROM T where a = 1 limit 1";
        this.assertPlanAndResult(sql, "+- Limit(offset=[0], fetch=[1], global=[false])\n+- TableSourceScan(table=[[PAIMON, default, T, filter=[=(a, 1)], project=[b, c], limit=[1]]], fields=[b, c])", Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 1, "1"}));
    }

    @Test
    public void testPartitionConditionConsuming_PartitionConditionAndOther() {
        String sql = "SELECT * FROM T where (a = 1 or a = 2) and c = '1' limit 1";
        this.assertPlanAndResult(sql, "+- Calc(select=[a, b, CAST('1' AS VARCHAR(2147483647)) AS c], where=[(c = '1')])\n+- TableSourceScan(table=[[PAIMON, default, T, filter=[and(OR(=(a, 1), =(a, 2)), =(c, _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], fields=[a, b, c])", Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 1, "1"}));
    }

    @Test
    public void testPartitionConditionNotConsuming1() {
        String sql = "SELECT * FROM T where a + 1 = 2 limit 1";
        this.assertPlanAndResult(sql, "+- Calc(select=[a, b, c], where=[((a + 1) = 2)])\n+- TableSourceScan(table=[[PAIMON, default, T, filter=[=(+(a, 1), 2)]]], fields=[a, b, c])", Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 1, "1"}));
    }

    @Test
    public void testPartitionConditionNotConsuming2() {
        String sql = "SELECT * FROM T where UNIX_TIMESTAMP() > 0";
        this.assertPlanAndResult(sql, "Calc(select=[a, b, c], where=[(UNIX_TIMESTAMP() > 0)])\n+- TableSourceScan(table=[[PAIMON, default, T, filter=[>(UNIX_TIMESTAMP(), 0)]]], fields=[a, b, c])", Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 1, "1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 2, "2"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, 3, "3"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, 3, "3"}));
    }

    @Test
    public void testPartitionConditionNotConsuming3() {
        String sql = "SELECT * FROM T where b = 3 and ( a = 2 or c = '3')";
        this.assertPlanAndResult(sql, "Calc(select=[a, CAST(3 AS INTEGER) AS b, c], where=[((b = 3) AND ((a = 2) OR (c = '3')))])\n+- TableSourceScan(table=[[PAIMON, default, T, filter=[and(=(b, 3), OR(=(a, 2), =(c, _UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")))]]], fields=[a, b, c])", Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, 3, "3"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, 3, "3"}));
    }

    @Test
    public void testStreamingReadingNotConsumePartitionCondition() throws TimeoutException {
        String sql = "SELECT * FROM T WHERE a = 5";
        String plan = this.sEnv.explainSql(sql, ExplainFormat.TEXT, new ExplainDetail[0]);
        Assertions.assertThat((String)plan).contains(new CharSequence[]{"Calc(select=[CAST(5 AS INTEGER) AS a, b, c], where=[(a = 5)])\n+- TableSourceScan(table=[[PAIMON, default, T, filter=[=(a, 5)]]], fields=[a, b, c])"});
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(sql).collect());
        this.sql("INSERT INTO T VALUES (5, 5, '5'), (6, 6, '6'), (5, 5, '5_1')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{5, 5, "5"}), Row.of((Object[])new Object[]{5, 5, "5_1"})});
    }

    @Test
    public void testPartitionCondition_ProjectionPushDown() {
        String sql = "SELECT b, a FROM T where a = 1 limit 1";
        this.assertPlanAndResult(sql, "+- Limit(offset=[0], fetch=[1], global=[false])\n+- TableSourceScan(table=[[PAIMON, default, T, filter=[=(a, 1)], project=[b], limit=[1]]], fields=[b])", Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 1}));
    }

    private void assertPlanAndResult(String sql, String planIdentifier, Row ... expectedRows) {
        String plan = this.tEnv.explainSql(sql, ExplainFormat.TEXT, new ExplainDetail[0]);
        String[] lines = plan.split("\n");
        String trimmed = Arrays.stream(lines).map(String::trim).collect(Collectors.joining("\n"));
        Assertions.assertThat((String)trimmed).contains(new CharSequence[]{planIdentifier});
        List<Row> result = this.batchSql(sql, new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])expectedRows);
    }
}

