/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.data.GenericsHelpers;
import org.apache.iceberg.spark.source.SparkScanBuilder;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters;
import org.apache.spark.sql.sources.And;
import org.apache.spark.sql.sources.EqualTo;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.GreaterThan;
import org.apache.spark.sql.sources.LessThan;
import org.apache.spark.sql.sources.Not;
import org.apache.spark.sql.sources.StringStartsWith;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectArrayAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestFilteredScan {
    private static final Configuration CONF = new Configuration();
    private static final HadoopTables TABLES = new HadoopTables(CONF);
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"id", (Type)Types.LongType.get()), Types.NestedField.optional((int)2, (String)"ts", (Type)Types.TimestampType.withZone()), Types.NestedField.optional((int)3, (String)"data", (Type)Types.StringType.get())});
    private static final PartitionSpec BUCKET_BY_ID = PartitionSpec.builderFor((Schema)SCHEMA).bucket("id", 4).build();
    private static final PartitionSpec PARTITION_BY_DAY = PartitionSpec.builderFor((Schema)SCHEMA).day("ts").build();
    private static final PartitionSpec PARTITION_BY_HOUR = PartitionSpec.builderFor((Schema)SCHEMA).hour("ts").build();
    private static final PartitionSpec PARTITION_BY_DATA = PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build();
    private static final PartitionSpec PARTITION_BY_ID = PartitionSpec.builderFor((Schema)SCHEMA).identity("id").build();
    private static SparkSession spark = null;
    @TempDir
    private Path temp;
    @Parameter(index=0)
    private String format;
    @Parameter(index=1)
    private boolean vectorized;
    @Parameter(index=2)
    private PlanningMode planningMode;
    private File parent = null;
    private File unpartitioned = null;
    private List<Record> records = null;

    @BeforeAll
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @AfterAll
    public static void stopSpark() {
        SparkSession currentSpark = spark;
        spark = null;
        currentSpark.stop();
    }

    @Parameters(name="format = {0}, vectorized = {1}, planningMode = {2}")
    public static Object[][] parameters() {
        return new Object[][]{{"parquet", false, PlanningMode.LOCAL}, {"parquet", true, PlanningMode.DISTRIBUTED}, {"avro", false, PlanningMode.LOCAL}, {"orc", false, PlanningMode.DISTRIBUTED}, {"orc", true, PlanningMode.LOCAL}};
    }

    @BeforeEach
    public void writeUnpartitionedTable() throws IOException {
        this.parent = this.temp.resolve("TestFilteredScan").toFile();
        this.unpartitioned = new File(this.parent, "unpartitioned");
        File dataFolder = new File(this.unpartitioned, "data");
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)dataFolder.mkdirs()).as("Mkdir should succeed", new Object[0])).isTrue();
        Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), (Map)ImmutableMap.of((Object)"read.data-planning-mode", (Object)this.planningMode.modeName(), (Object)"read.delete-planning-mode", (Object)this.planningMode.modeName()), this.unpartitioned.toString());
        Schema tableSchema = table.schema();
        FileFormat fileFormat = FileFormat.fromString((String)this.format);
        File testFile = new File(dataFolder, fileFormat.addExtension(UUID.randomUUID().toString()));
        this.records = this.testRecords(tableSchema);
        try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender(Files.localOutput((File)testFile), fileFormat);){
            writer.addAll(this.records);
        }
        DataFile file = DataFiles.builder((PartitionSpec)PartitionSpec.unpartitioned()).withRecordCount((long)this.records.size()).withFileSizeInBytes(testFile.length()).withPath(testFile.toString()).build();
        table.newAppend().appendFile(file).commit();
    }

    @TestTemplate
    public void testUnpartitionedIDFilters() {
        CaseInsensitiveStringMap options = new CaseInsensitiveStringMap((Map)ImmutableMap.of((Object)"path", (Object)this.unpartitioned.toString()));
        SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
        for (int i = 0; i < 10; ++i) {
            this.pushFilters((ScanBuilder)builder, new Filter[]{EqualTo.apply((String)"id", (Object)i)});
            Batch scan = builder.build().toBatch();
            Object[] partitions = scan.planInputPartitions();
            ((ObjectArrayAssert)Assertions.assertThat((Object[])partitions).as("Should only create one task for a small file", new Object[0])).hasSize(1);
            TestFilteredScan.assertEqualsSafe(SCHEMA.asStruct(), this.expected(i), TestFilteredScan.read(this.unpartitioned.toString(), this.vectorized, "id = " + i));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testUnpartitionedCaseInsensitiveIDFilters() {
        CaseInsensitiveStringMap options = new CaseInsensitiveStringMap((Map)ImmutableMap.of((Object)"path", (Object)this.unpartitioned.toString()));
        String caseSensitivityBeforeTest = spark.conf().get("spark.sql.caseSensitive");
        spark.conf().set("spark.sql.caseSensitive", "false");
        try {
            for (int i = 0; i < 10; ++i) {
                SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options).caseSensitive(false);
                this.pushFilters((ScanBuilder)builder, new Filter[]{EqualTo.apply((String)"ID", (Object)i)});
                Batch scan = builder.build().toBatch();
                Object[] tasks = scan.planInputPartitions();
                ((ObjectArrayAssert)Assertions.assertThat((Object[])tasks).as("Should only create one task for a small file", new Object[0])).hasSize(1);
                TestFilteredScan.assertEqualsSafe(SCHEMA.asStruct(), this.expected(i), TestFilteredScan.read(this.unpartitioned.toString(), this.vectorized, "id = " + i));
            }
        }
        finally {
            spark.conf().set("spark.sql.caseSensitive", caseSensitivityBeforeTest);
        }
    }

    @TestTemplate
    public void testUnpartitionedTimestampFilter() {
        CaseInsensitiveStringMap options = new CaseInsensitiveStringMap((Map)ImmutableMap.of((Object)"path", (Object)this.unpartitioned.toString()));
        SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
        this.pushFilters((ScanBuilder)builder, new Filter[]{LessThan.apply((String)"ts", (Object)"2017-12-22T00:00:00+00:00")});
        Batch scan = builder.build().toBatch();
        Object[] tasks = scan.planInputPartitions();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])tasks).as("Should only create one task for a small file", new Object[0])).hasSize(1);
        TestFilteredScan.assertEqualsSafe(SCHEMA.asStruct(), this.expected(5, 6, 7, 8, 9), TestFilteredScan.read(this.unpartitioned.toString(), this.vectorized, "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
    }

    @TestTemplate
    public void testBucketPartitionedIDFilters() {
        Table table = this.buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID);
        CaseInsensitiveStringMap options = new CaseInsensitiveStringMap((Map)ImmutableMap.of((Object)"path", (Object)table.location()));
        Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options).build().toBatch();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])unfiltered.planInputPartitions()).as("Unfiltered table should created 4 read tasks", new Object[0])).hasSize(4);
        for (int i = 0; i < 10; ++i) {
            SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
            this.pushFilters((ScanBuilder)builder, new Filter[]{EqualTo.apply((String)"id", (Object)i)});
            Batch scan = builder.build().toBatch();
            Object[] tasks = scan.planInputPartitions();
            ((ObjectArrayAssert)Assertions.assertThat((Object[])tasks).as("Should only create one task for a single bucket", new Object[0])).hasSize(1);
            TestFilteredScan.assertEqualsSafe(SCHEMA.asStruct(), this.expected(i), TestFilteredScan.read(table.location(), this.vectorized, "id = " + i));
        }
    }

    @TestTemplate
    public void testDayPartitionedTimestampFilters() {
        Table table = this.buildPartitionedTable("partitioned_by_day", PARTITION_BY_DAY);
        CaseInsensitiveStringMap options = new CaseInsensitiveStringMap((Map)ImmutableMap.of((Object)"path", (Object)table.location()));
        Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options).build().toBatch();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])unfiltered.planInputPartitions()).as("Unfiltered table should created 2 read tasks", new Object[0])).hasSize(2);
        SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
        this.pushFilters((ScanBuilder)builder, new Filter[]{LessThan.apply((String)"ts", (Object)"2017-12-22T00:00:00+00:00")});
        Batch scan = builder.build().toBatch();
        Object[] tasks = scan.planInputPartitions();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])tasks).as("Should create one task for 2017-12-21", new Object[0])).hasSize(1);
        TestFilteredScan.assertEqualsSafe(SCHEMA.asStruct(), this.expected(5, 6, 7, 8, 9), TestFilteredScan.read(table.location(), this.vectorized, "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
        builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
        this.pushFilters((ScanBuilder)builder, new Filter[]{And.apply((Filter)GreaterThan.apply((String)"ts", (Object)"2017-12-22T06:00:00+00:00"), (Filter)LessThan.apply((String)"ts", (Object)"2017-12-22T08:00:00+00:00"))});
        scan = builder.build().toBatch();
        tasks = scan.planInputPartitions();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])tasks).as("Should create one task for 2017-12-22", new Object[0])).hasSize(1);
        TestFilteredScan.assertEqualsSafe(SCHEMA.asStruct(), this.expected(1, 2), TestFilteredScan.read(table.location(), this.vectorized, "ts > cast('2017-12-22 06:00:00+00:00' as timestamp) and ts < cast('2017-12-22 08:00:00+00:00' as timestamp)"));
    }

    @TestTemplate
    public void testHourPartitionedTimestampFilters() {
        Table table = this.buildPartitionedTable("partitioned_by_hour", PARTITION_BY_HOUR);
        CaseInsensitiveStringMap options = new CaseInsensitiveStringMap((Map)ImmutableMap.of((Object)"path", (Object)table.location()));
        Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options).build().toBatch();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])unfiltered.planInputPartitions()).as("Unfiltered table should created 9 read tasks", new Object[0])).hasSize(9);
        SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
        this.pushFilters((ScanBuilder)builder, new Filter[]{LessThan.apply((String)"ts", (Object)"2017-12-22T00:00:00+00:00")});
        Batch scan = builder.build().toBatch();
        Object[] tasks = scan.planInputPartitions();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])tasks).as("Should create 4 tasks for 2017-12-21: 15, 17, 21, 22", new Object[0])).hasSize(4);
        TestFilteredScan.assertEqualsSafe(SCHEMA.asStruct(), this.expected(8, 9, 7, 6, 5), TestFilteredScan.read(table.location(), this.vectorized, "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
        builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
        this.pushFilters((ScanBuilder)builder, new Filter[]{And.apply((Filter)GreaterThan.apply((String)"ts", (Object)"2017-12-22T06:00:00+00:00"), (Filter)LessThan.apply((String)"ts", (Object)"2017-12-22T08:00:00+00:00"))});
        scan = builder.build().toBatch();
        tasks = scan.planInputPartitions();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])tasks).as("Should create 2 tasks for 2017-12-22: 6, 7", new Object[0])).hasSize(2);
        TestFilteredScan.assertEqualsSafe(SCHEMA.asStruct(), this.expected(2, 1), TestFilteredScan.read(table.location(), this.vectorized, "ts > cast('2017-12-22 06:00:00+00:00' as timestamp) and ts < cast('2017-12-22 08:00:00+00:00' as timestamp)"));
    }

    @TestTemplate
    public void testFilterByNonProjectedColumn() {
        Schema actualProjection = SCHEMA.select(new String[]{"id", "data"});
        ArrayList expected = Lists.newArrayList();
        for (Record rec : this.expected(5, 6, 7, 8, 9)) {
            expected.add(TestFilteredScan.projectFlat(actualProjection, rec));
        }
        TestFilteredScan.assertEqualsSafe(actualProjection.asStruct(), expected, TestFilteredScan.read(this.unpartitioned.toString(), this.vectorized, "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)", "id", "data"));
        actualProjection = SCHEMA.select(new String[]{"id"});
        expected = Lists.newArrayList();
        for (Record rec : this.expected(1, 2)) {
            expected.add(TestFilteredScan.projectFlat(actualProjection, rec));
        }
        TestFilteredScan.assertEqualsSafe(actualProjection.asStruct(), expected, TestFilteredScan.read(this.unpartitioned.toString(), this.vectorized, "ts > cast('2017-12-22 06:00:00+00:00' as timestamp) and ts < cast('2017-12-22 08:00:00+00:00' as timestamp)", "id", new String[0]));
    }

    @TestTemplate
    public void testPartitionedByDataStartsWithFilter() {
        Table table = this.buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA);
        CaseInsensitiveStringMap options = new CaseInsensitiveStringMap((Map)ImmutableMap.of((Object)"path", (Object)table.location()));
        SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
        this.pushFilters((ScanBuilder)builder, new Filter[]{new StringStartsWith("data", "junc")});
        Batch scan = builder.build().toBatch();
        Assertions.assertThat((Object[])scan.planInputPartitions()).hasSize(1);
    }

    @TestTemplate
    public void testPartitionedByDataNotStartsWithFilter() {
        Table table = this.buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA);
        CaseInsensitiveStringMap options = new CaseInsensitiveStringMap((Map)ImmutableMap.of((Object)"path", (Object)table.location()));
        SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
        this.pushFilters((ScanBuilder)builder, new Filter[]{new Not((Filter)new StringStartsWith("data", "junc"))});
        Batch scan = builder.build().toBatch();
        Assertions.assertThat((Object[])scan.planInputPartitions()).hasSize(9);
    }

    @TestTemplate
    public void testPartitionedByIdStartsWith() {
        Table table = this.buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID);
        CaseInsensitiveStringMap options = new CaseInsensitiveStringMap((Map)ImmutableMap.of((Object)"path", (Object)table.location()));
        SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
        this.pushFilters((ScanBuilder)builder, new Filter[]{new StringStartsWith("data", "junc")});
        Batch scan = builder.build().toBatch();
        Assertions.assertThat((Object[])scan.planInputPartitions()).hasSize(1);
    }

    @TestTemplate
    public void testPartitionedByIdNotStartsWith() {
        Table table = this.buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID);
        CaseInsensitiveStringMap options = new CaseInsensitiveStringMap((Map)ImmutableMap.of((Object)"path", (Object)table.location()));
        SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get((Object)"path")), options);
        this.pushFilters((ScanBuilder)builder, new Filter[]{new Not((Filter)new StringStartsWith("data", "junc"))});
        Batch scan = builder.build().toBatch();
        Assertions.assertThat((Object[])scan.planInputPartitions()).hasSize(9);
    }

    @TestTemplate
    public void testUnpartitionedStartsWith() {
        Dataset df = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(this.unpartitioned.toString());
        List matchedData = df.select("data", new String[0]).where("data LIKE 'jun%'").as(Encoders.STRING()).collectAsList();
        Assertions.assertThat((List)matchedData).hasSize(1);
        Assertions.assertThat((String)((String)matchedData.get(0))).isEqualTo("junction");
    }

    @TestTemplate
    public void testUnpartitionedNotStartsWith() {
        Dataset df = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(this.unpartitioned.toString());
        List matchedData = df.select("data", new String[0]).where("data NOT LIKE 'jun%'").as(Encoders.STRING()).collectAsList();
        List expected = this.testRecords(SCHEMA).stream().map(r -> r.getField("data").toString()).filter(d -> !d.startsWith("jun")).collect(Collectors.toList());
        Assertions.assertThat((List)matchedData).hasSize(9);
        Assertions.assertThat((Collection)Sets.newHashSet((Iterable)matchedData)).isEqualTo((Object)Sets.newHashSet(expected));
    }

    private static Record projectFlat(Schema projection, Record record) {
        GenericRecord result = GenericRecord.create((Schema)projection);
        List fields = projection.asStruct().fields();
        for (int i = 0; i < fields.size(); ++i) {
            Types.NestedField field = (Types.NestedField)fields.get(i);
            result.set(i, record.getField(field.name()));
        }
        return result;
    }

    public static void assertEqualsUnsafe(Types.StructType struct, List<Record> expected, List<UnsafeRow> actual) {
        int numRecords = Math.min(expected.size(), actual.size());
        for (int i = 0; i < numRecords; ++i) {
            GenericsHelpers.assertEqualsUnsafe(struct, expected.get(i), (InternalRow)actual.get(i));
        }
        ((ListAssert)Assertions.assertThat(actual).as("Number of results should match expected", new Object[0])).hasSameSizeAs(expected);
    }

    public static void assertEqualsSafe(Types.StructType struct, List<Record> expected, List<Row> actual) {
        int numRecords = Math.min(expected.size(), actual.size());
        for (int i = 0; i < numRecords; ++i) {
            GenericsHelpers.assertEqualsSafe(struct, expected.get(i), actual.get(i));
        }
        ((ListAssert)Assertions.assertThat(actual).as("Number of results should match expected", new Object[0])).hasSameSizeAs(expected);
    }

    private List<Record> expected(int ... ordinals) {
        ArrayList expected = Lists.newArrayListWithExpectedSize((int)ordinals.length);
        for (int ord : ordinals) {
            expected.add(this.records.get(ord));
        }
        return expected;
    }

    private void pushFilters(ScanBuilder scan, Filter ... filters) {
        Assertions.assertThat((Object)scan).isInstanceOf(SupportsPushDownV2Filters.class);
        SupportsPushDownV2Filters filterable = (SupportsPushDownV2Filters)scan;
        filterable.pushPredicates((Predicate[])Arrays.stream(filters).map(Filter::toV2).toArray(Predicate[]::new));
    }

    private Table buildPartitionedTable(String desc, PartitionSpec spec) {
        File location = new File(this.parent, desc);
        Table table = TABLES.create(SCHEMA, spec, location.toString());
        table.updateProperties().set("read.split.target-size", "2048").commit();
        Dataset allRows = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(this.vectorized)).load(this.unpartitioned.toString());
        allRows.write().option("fanout-enabled", "false").format("iceberg").mode("append").save(table.location());
        table.refresh();
        return table;
    }

    private List<Record> testRecords(Schema schema) {
        return Lists.newArrayList((Object[])new Record[]{TestFilteredScan.record(schema, 0L, TestFilteredScan.parse("2017-12-22T09:20:44.294658+00:00"), "junction"), TestFilteredScan.record(schema, 1L, TestFilteredScan.parse("2017-12-22T07:15:34.582910+00:00"), "alligator"), TestFilteredScan.record(schema, 2L, TestFilteredScan.parse("2017-12-22T06:02:09.243857+00:00"), ""), TestFilteredScan.record(schema, 3L, TestFilteredScan.parse("2017-12-22T03:10:11.134509+00:00"), "clapping"), TestFilteredScan.record(schema, 4L, TestFilteredScan.parse("2017-12-22T00:34:00.184671+00:00"), "brush"), TestFilteredScan.record(schema, 5L, TestFilteredScan.parse("2017-12-21T22:20:08.935889+00:00"), "trap"), TestFilteredScan.record(schema, 6L, TestFilteredScan.parse("2017-12-21T21:55:30.589712+00:00"), "element"), TestFilteredScan.record(schema, 7L, TestFilteredScan.parse("2017-12-21T17:31:14.532797+00:00"), "limited"), TestFilteredScan.record(schema, 8L, TestFilteredScan.parse("2017-12-21T15:21:51.237521+00:00"), "global"), TestFilteredScan.record(schema, 9L, TestFilteredScan.parse("2017-12-21T15:02:15.230570+00:00"), "goldfish")});
    }

    private static List<Row> read(String table, boolean vectorized, String expr) {
        return TestFilteredScan.read(table, vectorized, expr, "*", new String[0]);
    }

    private static List<Row> read(String table, boolean vectorized, String expr, String select0, String ... selectN) {
        Dataset dataset = spark.read().format("iceberg").option("vectorization-enabled", String.valueOf(vectorized)).load(table).filter(expr).select(select0, selectN);
        return dataset.collectAsList();
    }

    private static OffsetDateTime parse(String timestamp) {
        return OffsetDateTime.parse(timestamp);
    }

    private static Record record(Schema schema, Object ... values) {
        GenericRecord rec = GenericRecord.create((Schema)schema);
        for (int i = 0; i < values.length; ++i) {
            rec.set(i, values[i]);
        }
        return rec;
    }
}

