/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.bigquery.sqlengine;

import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.engine.sql.dataset.RecordCollection;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDatasetDescription;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDatasetProducer;
import io.cdap.cdap.etl.api.sql.engine.dataset.SparkRecordCollectionImpl;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngineConfig;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import javax.annotation.Nullable;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQuerySparkDatasetProducer
implements SQLDatasetProducer,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySparkDatasetProducer.class);
    private static final String FORMAT = "bigquery";
    private static final String CONFIG_CREDENTIALS_FILE = "credentialsFile";
    private static final String CONFIG_CREDENTIALS = "credentials";
    private BigQuerySQLEngineConfig config;
    private String project;
    private String bqDataset;
    private String bqTable;
    private Schema schema;

    public BigQuerySparkDatasetProducer(BigQuerySQLEngineConfig config, String project, String bqDataset, String bqTable, Schema schema) {
        this.config = config;
        this.project = project;
        this.bqDataset = bqDataset;
        this.bqTable = bqTable;
        this.schema = schema;
    }

    public SQLDatasetDescription getDescription() {
        return null;
    }

    @Nullable
    public RecordCollection produce(SQLDataset sqlDataset) {
        String path = String.format("%s.%s.%s", this.project, this.bqDataset, this.bqTable);
        SparkContext sc = SparkContext.getOrCreate();
        SparkSession spark = SparkSession.builder().appName("spark-bq-connector-reader").sparkContext(sc).getOrCreate();
        DataFrameReader bqReader = spark.read().format(FORMAT);
        if (Boolean.TRUE.equals(this.config.isServiceAccountFilePath()) && this.config.getServiceAccountFilePath() != null) {
            bqReader.option(CONFIG_CREDENTIALS_FILE, this.config.getServiceAccountFilePath());
        } else if (Boolean.TRUE.equals(this.config.isServiceAccountJson()) && this.config.getServiceAccountJson() != null) {
            bqReader.option(CONFIG_CREDENTIALS, this.encodeBase64(this.config.getServiceAccountJson()));
        }
        Dataset<Row> ds = bqReader.load(path);
        ds = this.convertFieldTypes(ds);
        return new SparkRecordCollectionImpl(ds);
    }

    private String encodeBase64(String serviceAccountJson) {
        return Base64.getEncoder().encodeToString(serviceAccountJson.getBytes(StandardCharsets.UTF_8));
    }

    private Dataset<Row> convertFieldTypes(Dataset<Row> ds) {
        for (Schema.Field field : this.schema.getFields()) {
            String fieldName = field.getName();
            Schema fieldSchema = field.getSchema();
            if (fieldSchema.isNullable()) {
                fieldSchema = fieldSchema.getNonNullable();
            }
            if (fieldSchema.getType() == Schema.Type.INT) {
                LOG.trace("Converting field {} to Integer", (Object)fieldName);
                ds = ds.withColumn(fieldName, ds.col(fieldName).cast(DataTypes.IntegerType));
            }
            if (fieldSchema.getType() != Schema.Type.FLOAT) continue;
            LOG.trace("Converting field {} to Float", (Object)fieldName);
            ds = ds.withColumn(fieldName, ds.col(fieldName).cast(DataTypes.FloatType));
        }
        return ds;
    }
}

