/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.flink.source.FlinkTableSource;
import org.apache.paimon.flink.source.SystemTableSource;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.system.ReadOptimizedTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class DataTableSourceTest {
    @TempDir
    java.nio.file.Path path;

    DataTableSourceTest() {
    }

    @Test
    void testInferScanParallelism() throws Exception {
        FileStoreTable fileStoreTable = this.createTable((Map<String, String>)ImmutableMap.of((Object)"bucket", (Object)"1"));
        this.writeData(fileStoreTable);
        DataTableSource tableSource = new DataTableSource(ObjectIdentifier.of((String)"cat", (String)"db", (String)"table"), (Table)fileStoreTable, true, null, null);
        PaimonDataStreamScanProvider runtimeProvider = this.runtimeProvider((FlinkTableSource)tableSource);
        LocalStreamEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment();
        DataStream sourceStream1 = runtimeProvider.produceDataStream(s -> Optional.empty(), (StreamExecutionEnvironment)sEnv1);
        AssertionsForClassTypes.assertThat((int)sourceStream1.getParallelism()).isEqualTo(1);
        LocalStreamEnvironment sEnv2 = StreamExecutionEnvironment.createLocalEnvironment((Configuration)Configuration.fromMap(Collections.singletonMap(String.format("%s%s", "paimon.", FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key()), "false")));
        DataStream sourceStream2 = runtimeProvider.produceDataStream(s -> Optional.empty(), (StreamExecutionEnvironment)sEnv2);
        AssertionsForClassTypes.assertThat((int)sourceStream2.getParallelism()).isNotEqualTo(1);
        AssertionsForClassTypes.assertThat((int)sourceStream2.getParallelism()).isEqualTo(sEnv2.getParallelism());
    }

    @Test
    public void testInferStreamParallelism() throws Exception {
        FileStoreTable fileStoreTable = this.createTable((Map<String, String>)ImmutableMap.of((Object)"bucket", (Object)"-1"));
        DataTableSource tableSource = new DataTableSource(ObjectIdentifier.of((String)"cat", (String)"db", (String)"table"), (Table)fileStoreTable, true, null, null);
        PaimonDataStreamScanProvider runtimeProvider = this.runtimeProvider((FlinkTableSource)tableSource);
        LocalStreamEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment();
        DataStream sourceStream1 = runtimeProvider.produceDataStream(s -> Optional.empty(), (StreamExecutionEnvironment)sEnv1);
        AssertionsForClassTypes.assertThat((int)sourceStream1.getParallelism()).isEqualTo(1);
    }

    @Test
    public void testSystemTableParallelism() throws Exception {
        FileStoreTable fileStoreTable = this.createTable((Map<String, String>)ImmutableMap.of((Object)"bucket", (Object)"1", (Object)"scan.parallelism", (Object)"3"));
        ReadOptimizedTable ro = new ReadOptimizedTable(fileStoreTable);
        SystemTableSource tableSource = new SystemTableSource((Table)ro, false, ObjectIdentifier.of((String)"cat", (String)"db", (String)"table"));
        PaimonDataStreamScanProvider runtimeProvider = this.runtimeProvider((FlinkTableSource)tableSource);
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        LocalStreamEnvironment sEnv1 = StreamExecutionEnvironment.createLocalEnvironment();
        DataStream sourceStream1 = runtimeProvider.produceDataStream(s -> Optional.empty(), (StreamExecutionEnvironment)sEnv1);
        AssertionsForClassTypes.assertThat((int)sourceStream1.getParallelism()).isEqualTo(3);
    }

    private PaimonDataStreamScanProvider runtimeProvider(FlinkTableSource tableSource) {
        return (PaimonDataStreamScanProvider)tableSource.getScanRuntimeProvider(new ScanTableSource.ScanContext(){

            public <T> TypeInformation<T> createTypeInformation(org.apache.flink.table.types.DataType dataType) {
                throw new UnsupportedOperationException();
            }

            public <T> TypeInformation<T> createTypeInformation(LogicalType logicalType) {
                throw new UnsupportedOperationException();
            }

            public DynamicTableSource.DataStructureConverter createDataStructureConverter(org.apache.flink.table.types.DataType dataType) {
                throw new UnsupportedOperationException();
            }
        });
    }

    private FileStoreTable createTable(Map<String, String> options) throws Exception {
        LocalFileIO fileIO = LocalFileIO.create();
        Path tablePath = new Path(this.path.toString());
        SchemaManager schemaManager = new SchemaManager((FileIO)fileIO, tablePath);
        TableSchema tableSchema = schemaManager.createTable(Schema.newBuilder().column("a", (DataType)DataTypes.INT()).column("b", (DataType)DataTypes.BIGINT()).options(options).build());
        return FileStoreTableFactory.create((FileIO)fileIO, (Path)tablePath, (TableSchema)tableSchema);
    }

    private void writeData(FileStoreTable table) throws Exception {
        TableWriteImpl writer = table.newWrite("test");
        TableCommitImpl commit = table.newCommit("test");
        writer.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 2L}));
        commit.commit(writer.prepareCommit());
        commit.close();
        writer.close();
    }
}

