/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.data;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.unsafe.types.UTF8String;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(value=Parameterized.class)
public class TestSparkParquetReadMetadataColumns {
    private static final Schema DATA_SCHEMA;
    private static final Schema PROJECTION_SCHEMA;
    private static final int NUM_ROWS = 1000;
    private static final List<InternalRow> DATA_ROWS;
    private static final List<InternalRow> EXPECTED_ROWS;
    private static final int NUM_ROW_GROUPS = 10;
    private static final int ROWS_PER_SPLIT = 100;
    private static final int RECORDS_PER_BATCH = 10;
    @Rule
    public TemporaryFolder temp = new TemporaryFolder();
    private final boolean vectorized;
    private File testFile;

    @Parameterized.Parameters(name="vectorized = {0}")
    public static Object[][] parameters() {
        return new Object[][]{{false}, {true}};
    }

    public TestSparkParquetReadMetadataColumns(boolean vectorized) {
        this.vectorized = vectorized;
    }

    @Before
    public void writeFile() throws IOException {
        ArrayList fileSplits = Lists.newArrayList();
        StructType struct = SparkSchemaUtil.convert((Schema)DATA_SCHEMA);
        Configuration conf = new Configuration();
        this.testFile = this.temp.newFile();
        Assert.assertTrue((String)"Delete should succeed", (boolean)this.testFile.delete());
        ParquetFileWriter parquetFileWriter = new ParquetFileWriter(conf, ParquetSchemaUtil.convert((Schema)DATA_SCHEMA, (String)"testSchema"), new Path(this.testFile.getAbsolutePath()));
        parquetFileWriter.start();
        for (int i = 0; i < 10; ++i) {
            File split = this.temp.newFile();
            Assert.assertTrue((String)"Delete should succeed", (boolean)split.delete());
            fileSplits.add(new Path(split.getAbsolutePath()));
            try (FileAppender writer = Parquet.write((OutputFile)Files.localOutput((File)split)).createWriterFunc(msgType -> SparkParquetWriters.buildWriter((StructType)struct, (MessageType)msgType)).schema(DATA_SCHEMA).overwrite().build();){
                writer.addAll(DATA_ROWS.subList(i * 100, (i + 1) * 100));
            }
            parquetFileWriter.appendFile((org.apache.parquet.io.InputFile)HadoopInputFile.fromPath((Path)new Path(split.getAbsolutePath()), (Configuration)conf));
        }
        parquetFileWriter.end(ParquetFileWriter.mergeMetadataFiles((List)fileSplits, (Configuration)conf).getFileMetaData().getKeyValueMetaData());
    }

    @Test
    public void testReadRowNumbers() throws IOException {
        this.readAndValidate(null, null, null, EXPECTED_ROWS);
    }

    @Test
    public void testReadRowNumbersWithDelete() throws IOException {
        Assume.assumeTrue((boolean)this.vectorized);
        ArrayList expectedRowsAfterDelete = Lists.newArrayList();
        EXPECTED_ROWS.forEach(row -> expectedRowsAfterDelete.add(row.copy()));
        for (int i = 98; i <= 102; ++i) {
            ((InternalRow)expectedRowsAfterDelete.get(i)).update(3, (Object)true);
        }
        Parquet.ReadBuilder builder = Parquet.read((InputFile)Files.localInput((File)this.testFile)).project(PROJECTION_SCHEMA);
        DeleteFilter deleteFilter = (DeleteFilter)Mockito.mock(DeleteFilter.class);
        Mockito.when((Object)deleteFilter.hasPosDeletes()).thenReturn((Object)true);
        CustomizedPositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
        deletedRowPos.delete(98L, 103L);
        Mockito.when((Object)deleteFilter.deletedRowPositions()).thenReturn((Object)deletedRowPos);
        builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader((Schema)PROJECTION_SCHEMA, (MessageType)fileSchema, (Map)Maps.newHashMap(), (DeleteFilter)deleteFilter));
        builder.recordsPerBatch(10);
        this.validate(expectedRowsAfterDelete, builder);
    }

    @Test
    public void testReadRowNumbersWithFilter() throws IOException {
        for (int i = 1; i < 5; ++i) {
            this.readAndValidate(Expressions.and((Expression)Expressions.lessThan((String)"id", (Object)500), (Expression)Expressions.greaterThanOrEqual((String)"id", (Object)(i * 100))), null, null, EXPECTED_ROWS.subList(i * 100, 500));
        }
    }

    @Test
    public void testReadRowNumbersWithSplits() throws IOException {
        ParquetFileReader fileReader = new ParquetFileReader((org.apache.parquet.io.InputFile)HadoopInputFile.fromPath((Path)new Path(this.testFile.getAbsolutePath()), (Configuration)new Configuration()), ParquetReadOptions.builder().build());
        List rowGroups = fileReader.getRowGroups();
        for (int i = 0; i < 10; ++i) {
            this.readAndValidate(null, ((ColumnChunkMetaData)((BlockMetaData)rowGroups.get(i)).getColumns().get(0)).getStartingPos(), ((BlockMetaData)rowGroups.get(i)).getCompressedSize(), EXPECTED_ROWS.subList(i * 100, (i + 1) * 100));
        }
    }

    private void readAndValidate(Expression filter, Long splitStart, Long splitLength, List<InternalRow> expected) throws IOException {
        Parquet.ReadBuilder builder = Parquet.read((InputFile)Files.localInput((File)this.testFile)).project(PROJECTION_SCHEMA);
        if (this.vectorized) {
            builder.createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader((Schema)PROJECTION_SCHEMA, (MessageType)fileSchema, (Map)Maps.newHashMap(), null));
            builder.recordsPerBatch(10);
        } else {
            builder = builder.createReaderFunc(msgType -> SparkParquetReaders.buildReader((Schema)PROJECTION_SCHEMA, (MessageType)msgType));
        }
        if (filter != null) {
            builder = builder.filter(filter);
        }
        if (splitStart != null && splitLength != null) {
            builder = builder.split(splitStart.longValue(), splitLength.longValue());
        }
        this.validate(expected, builder);
    }

    private void validate(List<InternalRow> expected, Parquet.ReadBuilder builder) throws IOException {
        try (CloseableIterable<InternalRow> reader = this.vectorized ? this.batchesToRows((CloseableIterable<ColumnarBatch>)builder.build()) : builder.build();){
            CloseableIterator actualRows = reader.iterator();
            for (InternalRow internalRow : expected) {
                Assert.assertTrue((String)"Should have expected number of rows", (boolean)actualRows.hasNext());
                TestHelpers.assertEquals(PROJECTION_SCHEMA, internalRow, actualRows.next());
            }
            Assert.assertFalse((String)"Should not have extra rows", (boolean)actualRows.hasNext());
        }
    }

    private CloseableIterable<InternalRow> batchesToRows(CloseableIterable<ColumnarBatch> batches) {
        return CloseableIterable.combine((Iterable)Iterables.concat((Iterable)Iterables.transform(batches, b -> () -> ((ColumnarBatch)b).rowIterator())), batches);
    }

    static {
        GenericInternalRow row;
        long i;
        DATA_SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)100, (String)"id", (Type)Types.LongType.get()), Types.NestedField.required((int)101, (String)"data", (Type)Types.StringType.get())});
        PROJECTION_SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)100, (String)"id", (Type)Types.LongType.get()), Types.NestedField.required((int)101, (String)"data", (Type)Types.StringType.get()), MetadataColumns.ROW_POSITION, MetadataColumns.IS_DELETED});
        DATA_ROWS = Lists.newArrayListWithCapacity((int)1000);
        for (i = 0L; i < 1000L; ++i) {
            row = new GenericInternalRow(DATA_SCHEMA.columns().size());
            if (i >= 500L) {
                row.update(0, (Object)(2L * i));
            } else {
                row.update(0, (Object)i);
            }
            row.update(1, (Object)UTF8String.fromString((String)("str" + i)));
            DATA_ROWS.add((InternalRow)row);
        }
        EXPECTED_ROWS = Lists.newArrayListWithCapacity((int)1000);
        for (i = 0L; i < 1000L; ++i) {
            row = new GenericInternalRow(PROJECTION_SCHEMA.columns().size());
            if (i >= 500L) {
                row.update(0, (Object)(2L * i));
            } else {
                row.update(0, (Object)i);
            }
            row.update(1, (Object)UTF8String.fromString((String)("str" + i)));
            row.update(2, (Object)i);
            row.update(3, (Object)false);
            EXPECTED_ROWS.add((InternalRow)row);
        }
    }

    private class CustomizedPositionDeleteIndex
    implements PositionDeleteIndex {
        private final Set<Long> deleteIndex = Sets.newHashSet();

        private CustomizedPositionDeleteIndex() {
        }

        public void delete(long position) {
            this.deleteIndex.add(position);
        }

        public void delete(long posStart, long posEnd) {
            for (long l = posStart; l < posEnd; ++l) {
                this.delete(l);
            }
        }

        public boolean isDeleted(long position) {
            return this.deleteIndex.contains(position);
        }

        public boolean isEmpty() {
            return this.deleteIndex.isEmpty();
        }
    }
}

