/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.SqlSource;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestSqlSource
extends UtilitiesTestBase {
    private final boolean useFlattenedSchema = false;
    private final String sqlSourceConfig = "hoodie.streamer.source.sql.sql.query";
    protected FilebasedSchemaProvider schemaProvider;
    protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
    private String dfsRoot;
    private TypedProperties props;
    private SqlSource sqlSource;
    private SourceFormatAdapter sourceFormatAdapter;

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initTestServices();
    }

    @AfterAll
    public static void cleanupClass() throws IOException {
        UtilitiesTestBase.cleanUpUtilitiesTestServices();
    }

    @Override
    @BeforeEach
    public void setup() throws Exception {
        this.dfsRoot = UtilitiesTestBase.basePath + "/parquetFiles";
        UtilitiesTestBase.fs.mkdirs(new Path(this.dfsRoot));
        this.props = new TypedProperties();
        super.setup();
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS(), jsc);
        this.generateTestTable("1", "001", 10000);
    }

    @Override
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
    }

    private void generateTestTable(String filename, String instantTime, int n) throws IOException {
        Path path = new Path(this.dfsRoot, filename);
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(this.dataGenerator.generateInserts(instantTime, Integer.valueOf(n), false)), path);
        sparkSession.read().parquet(this.dfsRoot).createOrReplaceTempView("test_sql_table");
    }

    @Test
    public void testSqlSourceAvroFormat() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter((Source)this.sqlSource);
        InputBatch fetch1 = this.sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Dataset fetch1Rows = AvroConversionUtils.createDataFrame((RDD)JavaRDD.toRDD((JavaRDD)((JavaRDD)fetch1.getBatch().get())), (String)this.schemaProvider.getSourceSchema().toString(), (SparkSession)sparkSession);
        Assertions.assertEquals((long)10000L, (long)fetch1Rows.count());
    }

    @Test
    public void testSqlSourceRowFormat() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter((Source)this.sqlSource);
        InputBatch fetch1AsRows = this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)10000L, (long)((Dataset)fetch1AsRows.getBatch().get()).count());
    }

    @Test
    public void testSqlSourceCheckpoint() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table where 1=0");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter((Source)this.sqlSource);
        InputBatch fetch1AsRows = this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertNull((Object)fetch1AsRows.getCheckpointForNextBatch());
    }

    @Test
    public void testSqlSourceMoreRecordsThanSourceLimit() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter((Source)this.sqlSource);
        InputBatch fetch1AsRows = this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), 1000L);
        Assertions.assertEquals((long)10000L, (long)((Dataset)fetch1AsRows.getBatch().get()).count());
    }

    @Test
    public void testSqlSourceZeroRecord() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from test_sql_table where 1=0");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter((Source)this.sqlSource);
        InputBatch fetch1AsRows = this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)0L, (long)((Dataset)fetch1AsRows.getBatch().get()).count());
    }

    @Test
    public void testSqlSourceInvalidTable() throws IOException {
        this.props.setProperty("hoodie.streamer.source.sql.sql.query", "select * from not_exist_sql_table");
        this.sqlSource = new SqlSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter((Source)this.sqlSource);
        Assertions.assertThrows(AnalysisException.class, () -> this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE));
    }
}

