/*
 * Decompiled with CFR 0.152.
 */
package io.trino.tests.product.kafka;

import io.trino.tempto.ProductTest;
import io.trino.tempto.Requirement;
import io.trino.tempto.RequirementsProvider;
import io.trino.tempto.Requires;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.configuration.Configuration;
import io.trino.tempto.fulfillment.table.TableDefinition;
import io.trino.tempto.fulfillment.table.TableRequirements;
import io.trino.tempto.fulfillment.table.kafka.KafkaDataSource;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessage;
import io.trino.tempto.fulfillment.table.kafka.KafkaMessageContentsBuilder;
import io.trino.tempto.fulfillment.table.kafka.KafkaTableDefinition;
import io.trino.tempto.fulfillment.table.kafka.ListKafkaDataSource;
import io.trino.tempto.query.QueryExecutor;
import io.trino.tests.product.utils.QueryExecutors;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.assertj.core.api.AssertProvider;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

public class TestKafkaPushdownSmokeTest
extends ProductTest {
    private static final String KAFKA_CATALOG = "kafka";
    private static final String SCHEMA_NAME = "product_tests";
    private static final long NUM_MESSAGES = 100L;
    private static final long TIMESTAMP_NUM_MESSAGES = 10L;
    private static final String PUSHDOWN_PARTITION_TABLE_NAME = "pushdown_partition";
    private static final String PUSHDOWN_PARTITION_TOPIC_NAME = "pushdown_partition";
    private static final String PUSHDOWN_OFFSET_TABLE_NAME = "pushdown_offset";
    private static final String PUSHDOWN_OFFSET_TOPIC_NAME = "pushdown_offset";
    private static final String PUSHDOWN_CREATE_TIME_TABLE_NAME = "pushdown_create_time";

    @Test(groups={"kafka", "profile_specific_tests"})
    @Requires(value={PushdownPartitionTable.class})
    public void testPartitionPushdown() {
        ((QueryAssert)Assertions.assertThat((AssertProvider)QueryExecutors.onTrino().executeQuery(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_id = 1", KAFKA_CATALOG, SCHEMA_NAME, "pushdown_partition"), new QueryExecutor.QueryParam[0]))).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{50L})});
    }

    @Test(groups={"kafka", "profile_specific_tests"})
    @Requires(value={PushdownOffsetTable.class})
    public void testOffsetPushdown() {
        ((QueryAssert)Assertions.assertThat((AssertProvider)QueryExecutors.onTrino().executeQuery(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset BETWEEN 6 AND 10", KAFKA_CATALOG, SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0]))).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{10})});
        ((QueryAssert)Assertions.assertThat((AssertProvider)QueryExecutors.onTrino().executeQuery(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset > 5 AND _partition_offset < 10", KAFKA_CATALOG, SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0]))).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{8})});
        ((QueryAssert)Assertions.assertThat((AssertProvider)QueryExecutors.onTrino().executeQuery(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset >= 5 AND _partition_offset <= 10", KAFKA_CATALOG, SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0]))).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{12})});
        ((QueryAssert)Assertions.assertThat((AssertProvider)QueryExecutors.onTrino().executeQuery(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset >= 5 AND _partition_offset < 10", KAFKA_CATALOG, SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0]))).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{10})});
        ((QueryAssert)Assertions.assertThat((AssertProvider)QueryExecutors.onTrino().executeQuery(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset > 5 AND _partition_offset <= 10", KAFKA_CATALOG, SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0]))).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{10})});
        ((QueryAssert)Assertions.assertThat((AssertProvider)QueryExecutors.onTrino().executeQuery(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _partition_offset = 5", KAFKA_CATALOG, SCHEMA_NAME, "pushdown_offset"), new QueryExecutor.QueryParam[0]))).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{2})});
    }

    @Test(groups={"kafka", "profile_specific_tests"})
    public void testCreateTimePushdown() throws InterruptedException {
        int i = 1;
        while ((long)i <= 10L) {
            QueryExecutors.onTrino().executeQuery(String.format("INSERT INTO %s.%s.%s (bigint_key, bigint_value) VALUES (%s, %s)", KAFKA_CATALOG, SCHEMA_NAME, PUSHDOWN_CREATE_TIME_TABLE_NAME, i, i), new QueryExecutor.QueryParam[0]);
            Thread.sleep(100L);
            ++i;
        }
        long startKey = 4L;
        long endKey = 6L;
        List rows = QueryExecutors.onTrino().executeQuery(String.format("SELECT CAST(_timestamp AS VARCHAR) FROM %s.%s.%s WHERE bigint_key IN (" + startKey + ", " + endKey + ") ORDER BY bigint_key", KAFKA_CATALOG, SCHEMA_NAME, PUSHDOWN_CREATE_TIME_TABLE_NAME), new QueryExecutor.QueryParam[0]).rows();
        String startTime = (String)((List)rows.get(0)).get(0);
        String endTime = (String)((List)rows.get(1)).get(0);
        ((QueryAssert)Assertions.assertThat((AssertProvider)QueryExecutors.onTrino().executeQuery(String.format("SELECT COUNT(*) FROM %s.%s.%s WHERE _timestamp >= TIMESTAMP '%s' AND _timestamp < TIMESTAMP '%s'", KAFKA_CATALOG, SCHEMA_NAME, PUSHDOWN_CREATE_TIME_TABLE_NAME, startTime, endTime), new QueryExecutor.QueryParam[0]))).containsExactlyInOrder(new QueryAssert.Row[]{QueryAssert.Row.row((Object[])new Object[]{endKey - startKey})});
    }

    private static class PushdownOffsetTable
    implements RequirementsProvider {
        private PushdownOffsetTable() {
        }

        public Requirement getRequirements(Configuration configuration) {
            List records = LongStream.rangeClosed(1L, 100L).boxed().map(i -> new KafkaMessage(KafkaMessageContentsBuilder.contentsBuilder().appendUTF8(String.format("%s", i % 2L)).build(), KafkaMessageContentsBuilder.contentsBuilder().appendUTF8(String.format("%s", i)).build())).collect(Collectors.toList());
            return TableRequirements.immutableTable((TableDefinition)new KafkaTableDefinition("product_tests.pushdown_offset", "pushdown_offset", (KafkaDataSource)new ListKafkaDataSource(records), 2, 1));
        }
    }

    private static class PushdownPartitionTable
    implements RequirementsProvider {
        private PushdownPartitionTable() {
        }

        public Requirement getRequirements(Configuration configuration) {
            List records = LongStream.rangeClosed(1L, 100L).boxed().map(i -> new KafkaMessage(KafkaMessageContentsBuilder.contentsBuilder().appendUTF8(String.format("%s", i % 2L)).build(), KafkaMessageContentsBuilder.contentsBuilder().appendUTF8(String.format("%s", i)).build())).collect(Collectors.toList());
            return TableRequirements.immutableTable((TableDefinition)new KafkaTableDefinition("product_tests.pushdown_partition", "pushdown_partition", (KafkaDataSource)new ListKafkaDataSource(records), 2, 1));
        }
    }
}

