/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.utilities.config.SqlFileBasedSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqlFileBasedSource
extends RowSource {
    private static final Logger LOG = LoggerFactory.getLogger(SqlFileBasedSource.class);
    private final String sourceSqlFile;
    private final boolean shouldEmitCheckPoint;

    public SqlFileBasedSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
        ConfigUtils.checkRequiredConfigProperties((TypedProperties)props, Collections.singletonList(SqlFileBasedSourceConfig.SOURCE_SQL_FILE));
        this.sourceSqlFile = ConfigUtils.getStringWithAltKeys((Properties)props, SqlFileBasedSourceConfig.SOURCE_SQL_FILE);
        this.shouldEmitCheckPoint = ConfigUtils.getBooleanWithAltKeys((Properties)props, SqlFileBasedSourceConfig.EMIT_EPOCH_CHECKPOINT);
    }

    @Override
    protected Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        Dataset rows = null;
        FileSystem fs = HadoopFSUtils.getFs((String)this.sourceSqlFile, (Configuration)this.sparkContext.hadoopConfiguration(), (boolean)true);
        try {
            Scanner scanner = new Scanner((InputStream)fs.open(new Path(this.sourceSqlFile)));
            scanner.useDelimiter(";");
            while (scanner.hasNext()) {
                String sqlStr = scanner.next().trim();
                if (sqlStr.isEmpty()) continue;
                LOG.info(sqlStr);
                rows = this.sparkSession.sql(sqlStr);
            }
            return Pair.of((Object)Option.of(rows), (Object)(this.shouldEmitCheckPoint ? new StreamerCheckpointV2(String.valueOf(System.currentTimeMillis())) : null));
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Error reading source SQL file.", ioe);
        }
    }
}

