/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.SqlFileBasedSource;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestSqlFileBasedSource
extends UtilitiesTestBase {
    private final boolean useFlattenedSchema = false;
    private final String sqlFileSourceConfig = "hoodie.streamer.source.sql.file";
    private final String sqlFileSourceConfigEmitChkPointConf = "hoodie.streamer.source.sql.checkpoint.emit";
    protected FilebasedSchemaProvider schemaProvider;
    protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
    private String dfsRoot;
    private TypedProperties props;
    private SqlFileBasedSource sqlFileSource;
    private SourceFormatAdapter sourceFormatAdapter;

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initTestServices(false, false, false);
    }

    @AfterAll
    public static void cleanupClass() {
        UtilitiesTestBase.cleanUpUtilitiesTestServices();
    }

    @Override
    @BeforeEach
    public void setup() throws Exception {
        this.dfsRoot = UtilitiesTestBase.basePath + "/parquetFiles";
        UtilitiesTestBase.fs.mkdirs(new Path(this.dfsRoot));
        this.props = new TypedProperties();
        super.setup();
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS(), jsc);
        this.generateTestTable("1", "001", 10000);
    }

    @Override
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
    }

    private void generateTestTable(String filename, String instantTime, int n) throws IOException {
        Path path = new Path(this.dfsRoot, filename);
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(this.dataGenerator.generateInserts(instantTime, Integer.valueOf(n), false)), path);
        sparkSession.read().parquet(this.dfsRoot).createOrReplaceTempView("test_sql_table");
    }

    @Test
    public void testSqlFileBasedSourceAvroFormat() throws IOException {
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/sql-file-based-source.sql", storage, UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
        this.props.setProperty("hoodie.streamer.source.sql.file", UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
        this.sqlFileSource = new SqlFileBasedSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter((Source)this.sqlFileSource);
        InputBatch fetch1 = this.sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Dataset fetch1Rows = AvroConversionUtils.createDataFrame((RDD)JavaRDD.toRDD((JavaRDD)((JavaRDD)fetch1.getBatch().get())), (String)this.schemaProvider.getSourceSchema().toString(), (SparkSession)sparkSession);
        Assertions.assertEquals((long)10000L, (long)fetch1Rows.count());
    }

    @Test
    public void testSqlFileBasedSourceRowFormat() throws IOException {
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/sql-file-based-source.sql", storage, UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
        this.props.setProperty("hoodie.streamer.source.sql.file", UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
        this.sqlFileSource = new SqlFileBasedSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter((Source)this.sqlFileSource);
        InputBatch fetch1AsRows = this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)10000L, (long)((Dataset)fetch1AsRows.getBatch().get()).count());
    }

    @Test
    public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() throws IOException {
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/sql-file-based-source.sql", storage, UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
        this.props.setProperty("hoodie.streamer.source.sql.file", UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
        this.sqlFileSource = new SqlFileBasedSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter((Source)this.sqlFileSource);
        InputBatch fetch1AsRows = this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), 1000L);
        Assertions.assertEquals((long)10000L, (long)((Dataset)fetch1AsRows.getBatch().get()).count());
    }

    @Test
    public void testSqlFileBasedSourceInvalidTable() throws IOException {
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/sql-file-based-source-invalid-table.sql", storage, UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql");
        this.props.setProperty("hoodie.streamer.source.sql.file", UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql");
        this.sqlFileSource = new SqlFileBasedSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        this.sourceFormatAdapter = new SourceFormatAdapter((Source)this.sqlFileSource);
        Assertions.assertThrows(AnalysisException.class, () -> this.sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE));
    }

    @Test
    public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint() throws IOException {
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/sql-file-based-source.sql", storage, UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
        this.props.setProperty("hoodie.streamer.source.sql.file", UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
        this.props.setProperty("hoodie.streamer.source.sql.checkpoint.emit", "true");
        this.sqlFileSource = new SqlFileBasedSource(this.props, jsc, sparkSession, (SchemaProvider)this.schemaProvider);
        Pair nextBatch = this.sqlFileSource.fetchNextBatch(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)10000L, (long)((Dataset)((Option)nextBatch.getLeft()).get()).count());
        long currentTimeInMillis = System.currentTimeMillis();
        long checkpointToBeUsed = Long.parseLong((String)nextBatch.getRight());
        Assertions.assertTrue(((currentTimeInMillis - checkpointToBeUsed) / 1000L < 60L ? 1 : 0) != 0);
        Assertions.assertTrue((currentTimeInMillis > checkpointToBeUsed ? 1 : 0) != 0);
    }
}

