/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestIcebergSourceBoundedGenericRecord {
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Rule
    public final HadoopCatalogResource catalogResource = new HadoopCatalogResource(TEMPORARY_FOLDER, "default", "t");
    private final FileFormat fileFormat;
    private final int parallelism;

    @Parameterized.Parameters(name="format={0}, parallelism = {1}")
    public static Object[][] parameters() {
        return new Object[][]{{"avro", 2}, {"parquet", 2}, {"orc", 2}};
    }

    public TestIcebergSourceBoundedGenericRecord(String format, int parallelism) {
        this.fileFormat = FileFormat.valueOf((String)format.toUpperCase(Locale.ENGLISH));
        this.parallelism = parallelism;
    }

    @Test
    public void testUnpartitionedTable() throws Exception {
        Table table = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
        List expectedRecords = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)2, (long)0L);
        new GenericAppenderHelper(table, this.fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords);
        TestHelpers.assertRecords(this.run(), expectedRecords, TestFixtures.SCHEMA);
    }

    @Test
    public void testPartitionedTable() throws Exception {
        String dateStr = "2020-03-20";
        Table table = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
        List expectedRecords = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)2, (long)0L);
        for (int i = 0; i < expectedRecords.size(); ++i) {
            ((Record)expectedRecords.get(i)).setField("dt", (Object)dateStr);
        }
        new GenericAppenderHelper(table, this.fileFormat, TEMPORARY_FOLDER).appendToTable((StructLike)TestHelpers.Row.of((Object[])new Object[]{dateStr, 0}), expectedRecords);
        TestHelpers.assertRecords(this.run(), expectedRecords, TestFixtures.SCHEMA);
    }

    @Test
    public void testProjection() throws Exception {
        Table table = this.catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
        List expectedRecords = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)2, (long)0L);
        new GenericAppenderHelper(table, this.fileFormat, TEMPORARY_FOLDER).appendToTable((StructLike)TestHelpers.Row.of((Object[])new Object[]{"2020-03-20", 0}), expectedRecords);
        Schema projectedSchema = TypeUtil.select((Schema)TestFixtures.SCHEMA, (Set)Sets.newHashSet((Object[])new Integer[]{1}));
        List<Row> expectedRows = Arrays.asList(Row.of((Object[])new Object[]{((Record)expectedRecords.get(0)).get(0)}), Row.of((Object[])new Object[]{((Record)expectedRecords.get(1)).get(0)}));
        TestHelpers.assertRows(this.run(projectedSchema, Collections.emptyList(), Collections.emptyMap()), expectedRows);
    }

    private List<Row> run() throws Exception {
        return this.run(null, Collections.emptyList(), Collections.emptyMap());
    }

    private List<Row> run(Schema projectedSchema, List<Expression> filters, Map<String, String> options) throws Exception {
        Table table;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(this.parallelism);
        env.getConfig().enableObjectReuse();
        Configuration config = new Configuration();
        config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
        try (TableLoader tableLoader = this.catalogResource.tableLoader();){
            tableLoader.open();
            table = tableLoader.loadTable();
        }
        AvroGenericRecordReaderFunction readerFunction = new AvroGenericRecordReaderFunction(TestFixtures.TABLE_IDENTIFIER.name(), (ReadableConfig)new Configuration(), table.schema(), null, null, false, table.io(), table.encryption(), filters);
        IcebergSource.Builder sourceBuilder = IcebergSource.builder().tableLoader(this.catalogResource.tableLoader()).readerFunction((ReaderFunction)readerFunction).assignerFactory((SplitAssignerFactory)new SimpleSplitAssignerFactory()).flinkConfig((ReadableConfig)config);
        if (projectedSchema != null) {
            sourceBuilder.project(projectedSchema);
        }
        sourceBuilder.filters(filters);
        sourceBuilder.setAll(options);
        Schema readSchema = projectedSchema != null ? projectedSchema : table.schema();
        RowType rowType = FlinkSchemaUtil.convert((Schema)readSchema);
        org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert((Schema)readSchema, (String)TestFixtures.TABLE_IDENTIFIER.name());
        SingleOutputStreamOperator stream = env.fromSource((Source)sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "testBasicRead", (TypeInformation)new GenericRecordAvroTypeInfo(avroSchema)).map((MapFunction)AvroGenericRecordToRowDataMapper.forAvroSchema((org.apache.avro.Schema)avroSchema)).map((MapFunction)new RowDataToRowMapper(rowType));
        try (CloseableIterator iter = stream.executeAndCollect();){
            ArrayList arrayList = Lists.newArrayList((Iterator)iter);
            return arrayList;
        }
    }
}

