/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.TestFlinkScan;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestIcebergSourceBounded
extends TestFlinkScan {
    public TestIcebergSourceBounded(String fileFormat) {
        super(fileFormat);
    }

    @Override
    protected List<Row> runWithProjection(String ... projected) throws Exception {
        Schema icebergTableSchema = this.catalogResource.catalog().loadTable(TestFixtures.TABLE_IDENTIFIER).schema();
        TableSchema.Builder builder = TableSchema.builder();
        TableSchema schema = FlinkSchemaUtil.toSchema((RowType)FlinkSchemaUtil.convert((Schema)icebergTableSchema));
        for (String field : projected) {
            TableColumn column = (TableColumn)schema.getTableColumn(field).get();
            builder.field(column.getName(), column.getType());
        }
        TableSchema flinkSchema = builder.build();
        Schema projectedSchema = FlinkSchemaUtil.convert((Schema)icebergTableSchema, (TableSchema)flinkSchema);
        return this.run(projectedSchema, Lists.newArrayList(), Maps.newHashMap(), "", projected);
    }

    @Override
    protected List<Row> runWithFilter(Expression filter, String sqlFilter, boolean caseSensitive) throws Exception {
        HashMap options = Maps.newHashMap();
        options.put("case-sensitive", Boolean.toString(caseSensitive));
        return this.run(null, Collections.singletonList(filter), options, sqlFilter, "*");
    }

    @Override
    protected List<Row> runWithOptions(Map<String, String> options) throws Exception {
        return this.run(null, Lists.newArrayList(), options, "", "*");
    }

    @Override
    protected List<Row> run() throws Exception {
        return this.run(null, Lists.newArrayList(), Maps.newHashMap(), "", "*");
    }

    protected List<Row> run(Schema projectedSchema, List<Expression> filters, Map<String, String> options, String sqlFilter, String ... sqlSelectedFields) throws Exception {
        Table table;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Configuration config = new Configuration();
        config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
        try (TableLoader tableLoader = this.tableLoader();){
            tableLoader.open();
            table = tableLoader.loadTable();
        }
        IcebergSource.Builder sourceBuilder = IcebergSource.forRowData().tableLoader(this.tableLoader()).table(table).assignerFactory((SplitAssignerFactory)new SimpleSplitAssignerFactory()).flinkConfig((ReadableConfig)config);
        if (projectedSchema != null) {
            sourceBuilder.project(projectedSchema);
        }
        sourceBuilder.filters(filters);
        sourceBuilder.properties(options);
        SingleOutputStreamOperator stream = env.fromSource((Source)sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "testBasicRead", TypeInformation.of(RowData.class)).map((MapFunction)new RowDataToRowMapper(FlinkSchemaUtil.convert((Schema)(projectedSchema == null ? table.schema() : projectedSchema))));
        try (CloseableIterator iter = stream.executeAndCollect();){
            ArrayList arrayList = Lists.newArrayList((Iterator)iter);
            return arrayList;
        }
    }
}

