/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.bigquery.sqlengine;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.SQLEngineContext;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.engine.sql.BatchSQLEngine;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
import io.cdap.cdap.etl.api.engine.sql.capability.DefaultPullCapability;
import io.cdap.cdap.etl.api.engine.sql.capability.PullCapability;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDatasetProducer;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLPullDataset;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLPushDataset;
import io.cdap.cdap.etl.api.engine.sql.request.SQLJoinDefinition;
import io.cdap.cdap.etl.api.engine.sql.request.SQLJoinRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLPullRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLPushRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLReadRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLReadResult;
import io.cdap.cdap.etl.api.engine.sql.request.SQLRelationDefinition;
import io.cdap.cdap.etl.api.engine.sql.request.SQLTransformDefinition;
import io.cdap.cdap.etl.api.engine.sql.request.SQLTransformRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLWriteRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLWriteResult;
import io.cdap.cdap.etl.api.join.JoinCondition;
import io.cdap.cdap.etl.api.join.JoinDefinition;
import io.cdap.cdap.etl.api.join.JoinStage;
import io.cdap.cdap.etl.api.relational.Capability;
import io.cdap.cdap.etl.api.relational.Engine;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType;
import io.cdap.plugin.gcp.bigquery.relational.BigQueryRelation;
import io.cdap.plugin.gcp.bigquery.relational.SQLExpressionFactory;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceUtils;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryJobType;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryPullDataset;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryPushDataset;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLDataset;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngineConfig;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySelectDataset;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySparkDatasetProducer;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite;
import io.cdap.plugin.gcp.bigquery.sqlengine.builder.BigQueryJoinSQLBuilder;
import io.cdap.plugin.gcp.bigquery.sqlengine.util.BigQuerySQLEngineUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="sqlengine")
@Name(value="BigQueryPushdownEngine")
@Description(value="BigQuery SQLEngine implementation, used to push down certain pipeline steps into BigQuery. A GCS bucket is used as staging for the read/write operations performed by this engine. BigQuery is Google's serverless, highly scalable, enterprise data warehouse.")
@Metadata(properties={@MetadataProperty(key="connector", value="BigQuery")})
public class BigQuerySQLEngine
extends BatchSQLEngine<LongWritable, GenericData.Record, StructuredRecord, NullWritable>
implements Engine {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySQLEngine.class);
    public static final String NAME = "BigQueryPushdownEngine";
    private final BigQuerySQLEngineConfig sqlEngineConfig;
    private SQLEngineContext ctx;
    private BigQuery bigQuery;
    private Storage storage;
    private Configuration configuration;
    private String project;
    private String datasetProject;
    private String datasetName;
    private String bucket;
    private String runId;
    private Map<String, BigQuerySQLDataset> datasets;
    private Metrics metrics;

    public BigQuerySQLEngine(BigQuerySQLEngineConfig sqlEngineConfig) {
        this.sqlEngineConfig = sqlEngineConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        this.sqlEngineConfig.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
    }

    public void prepareRun(SQLEngineContext context) throws Exception {
        super.prepareRun(context);
        this.ctx = context;
        this.sqlEngineConfig.validate();
        this.runId = BigQuerySQLEngineUtils.newIdentifier();
        this.datasets = new HashMap<String, BigQuerySQLDataset>();
        String serviceAccount = this.sqlEngineConfig.getServiceAccount();
        GoogleCredentials credentials = serviceAccount == null ? null : GCPUtils.loadServiceAccountCredentials(serviceAccount, this.sqlEngineConfig.isServiceAccountFilePath());
        this.project = this.sqlEngineConfig.getProject();
        this.datasetProject = this.sqlEngineConfig.getDatasetProject();
        this.datasetName = this.sqlEngineConfig.getDataset();
        this.bigQuery = GCPUtils.getBigQuery(this.project, (Credentials)credentials);
        this.storage = GCPUtils.getStorage(this.project, (Credentials)credentials);
        Dataset dataset = this.bigQuery.getDataset(DatasetId.of((String)this.datasetProject, (String)this.datasetName), new BigQuery.DatasetOption[0]);
        this.bucket = BigQueryUtil.getStagingBucketName(context.getRuntimeArguments(), this.sqlEngineConfig.getLocation(), dataset, this.sqlEngineConfig.getBucket());
        String cmekKey = !Strings.isNullOrEmpty((String)this.sqlEngineConfig.cmekKey) ? this.sqlEngineConfig.cmekKey : (String)this.ctx.getRuntimeArguments().get("gcp.cmek.key.name");
        CryptoKeyName cmekKeyName = null;
        if (!Strings.isNullOrEmpty((String)cmekKey)) {
            cmekKeyName = CryptoKeyName.parse((String)cmekKey);
        }
        this.configuration = BigQueryUtil.getBigQueryConfig(this.sqlEngineConfig.getServiceAccount(), this.sqlEngineConfig.getProject(), cmekKeyName, this.sqlEngineConfig.getServiceAccountType());
        String fallbackBucketName = "bqpushdown-" + this.runId;
        this.bucket = BigQuerySinkUtils.configureBucket(this.configuration, this.bucket, fallbackBucketName);
        BigQuerySinkUtils.createResources(this.bigQuery, this.storage, DatasetId.of((String)this.datasetProject, (String)this.datasetName), this.bucket, this.sqlEngineConfig.getLocation(), cmekKeyName);
        BigQuerySourceUtils.configureServiceAccount(this.configuration, this.sqlEngineConfig.connection);
        this.metrics = this.ctx.getMetrics();
    }

    public void onRunFinish(boolean succeeded, SQLEngineContext context) {
        super.onRunFinish(succeeded, context);
        String gcsPath = this.sqlEngineConfig.getBucket() == null ? String.format("gs://%s", this.bucket) : String.format("gs://%s/%s", this.bucket, this.runId);
        try {
            BigQueryUtil.deleteTemporaryDirectory(this.configuration, gcsPath);
        }
        catch (IOException e) {
            LOG.warn("Failed to delete temporary directory '{}': {}", (Object)gcsPath, (Object)e.getMessage());
        }
    }

    public SQLPushDataset<StructuredRecord, StructuredRecord, NullWritable> getPushProvider(SQLPushRequest sqlPushRequest) throws SQLEngineException {
        try {
            BigQueryPushDataset pushDataset = BigQueryPushDataset.getInstance(sqlPushRequest, this.sqlEngineConfig, this.configuration, this.bigQuery, DatasetId.of((String)this.datasetProject, (String)this.datasetName), this.bucket, this.runId);
            LOG.info("Executing Push operation for dataset {} stored in table {}", (Object)sqlPushRequest.getDatasetName(), (Object)pushDataset.getBigQueryTable());
            this.datasets.put(sqlPushRequest.getDatasetName(), pushDataset);
            return pushDataset;
        }
        catch (IOException ioe) {
            throw new SQLEngineException((Throwable)ioe);
        }
    }

    public SQLPullDataset<StructuredRecord, LongWritable, GenericData.Record> getPullProvider(SQLPullRequest sqlPullRequest) throws SQLEngineException {
        if (!this.datasets.containsKey(sqlPullRequest.getDatasetName())) {
            throw new SQLEngineException(String.format("Trying to pull non-existing dataset: '%s", sqlPullRequest.getDatasetName()));
        }
        String table = this.datasets.get(sqlPullRequest.getDatasetName()).getBigQueryTable();
        LOG.info("Executing Pull operation for dataset {} stored in table {}", (Object)sqlPullRequest.getDatasetName(), (Object)table);
        try {
            return BigQueryPullDataset.getInstance(sqlPullRequest, this.configuration, this.bigQuery, DatasetId.of((String)this.datasetProject, (String)this.datasetName), table, this.bucket, this.runId);
        }
        catch (IOException ioe) {
            throw new SQLEngineException((Throwable)ioe);
        }
    }

    public boolean exists(String datasetName) throws SQLEngineException {
        return this.datasets.containsKey(datasetName);
    }

    public boolean canJoin(SQLJoinDefinition sqlJoinDefinition) {
        boolean canJoin = BigQuerySQLEngine.isValidJoinDefinition(sqlJoinDefinition);
        LOG.info("Validating join for stage '{}' can be executed on BigQuery: {}", (Object)sqlJoinDefinition.getDatasetName(), (Object)canJoin);
        return canJoin;
    }

    @VisibleForTesting
    protected static boolean isValidJoinDefinition(SQLJoinDefinition sqlJoinDefinition) {
        ArrayList<String> validationProblems = new ArrayList<String>();
        JoinDefinition joinDefinition = sqlJoinDefinition.getJoinDefinition();
        for (JoinStage inputStage : joinDefinition.getStages()) {
            BigQuerySQLEngineUtils.validateInputStage(inputStage, validationProblems);
        }
        BigQuerySQLEngineUtils.validateOutputSchema(joinDefinition.getOutputSchema(), validationProblems);
        if (joinDefinition.getCondition().getOp() == JoinCondition.Op.EXPRESSION) {
            BigQuerySQLEngineUtils.validateOnExpressionJoinCondition((JoinCondition.OnExpression)joinDefinition.getCondition(), validationProblems);
        }
        if (joinDefinition.getCondition().getOp() == JoinCondition.Op.KEY_EQUALITY) {
            BigQuerySQLEngineUtils.validateJoinOnKeyStages(joinDefinition, validationProblems);
        }
        if (!validationProblems.isEmpty()) {
            LOG.warn("Join operation for stage '{}' could not be executed in BigQuery. Issues found: {}.", (Object)sqlJoinDefinition.getDatasetName(), (Object)String.join((CharSequence)"; ", validationProblems));
        }
        return validationProblems.isEmpty();
    }

    public SQLDataset join(SQLJoinRequest sqlJoinRequest) throws SQLEngineException {
        BigQueryJoinSQLBuilder builder = new BigQueryJoinSQLBuilder(sqlJoinRequest.getJoinDefinition(), DatasetId.of((String)this.datasetProject, (String)this.datasetName), this.getStageNameToBQTableNameMap());
        return this.executeSelect(sqlJoinRequest.getDatasetName(), sqlJoinRequest.getJoinDefinition().getOutputSchema(), BigQueryJobType.JOIN, builder.getQuery());
    }

    @Nullable
    public SQLDatasetProducer getProducer(SQLPullRequest pullRequest, PullCapability capability) {
        if (!this.sqlEngineConfig.shouldUseStorageReadAPI().booleanValue() || capability != DefaultPullCapability.SPARK_RDD_PULL) {
            return null;
        }
        String table = this.datasets.get(pullRequest.getDatasetName()).getBigQueryTable();
        return new BigQuerySparkDatasetProducer(this.sqlEngineConfig, this.datasetProject, this.datasetName, table, pullRequest.getDatasetSchema());
    }

    public Set<PullCapability> getPullCapabilities() {
        if (!this.sqlEngineConfig.shouldUseStorageReadAPI().booleanValue()) {
            return Collections.emptySet();
        }
        return Collections.singleton(DefaultPullCapability.SPARK_RDD_PULL);
    }

    public SQLReadResult read(SQLReadRequest readRequest) throws SQLEngineException {
        String datasetName = readRequest.getDatasetName();
        if (!BigQuerySQLEngine.class.getName().equals(readRequest.getInput().getSqlEngineClassName())) {
            LOG.debug("Got output for another SQL engine {}, skipping", (Object)readRequest.getInput().getSqlEngineClassName());
            return SQLReadResult.unsupported((String)datasetName);
        }
        String destinationTable = BigQuerySQLEngineUtils.getNewTableName(this.runId);
        TableId destinationTableId = TableId.of((String)this.datasetProject, (String)this.datasetName, (String)destinationTable);
        BigQueryReadDataset readDataset = BigQueryReadDataset.getInstance(datasetName, this.sqlEngineConfig, this.bigQuery, readRequest, destinationTableId, this.metrics);
        SQLReadResult result = readDataset.read();
        if (result.isSuccessful()) {
            this.datasets.put(datasetName, readDataset);
        }
        return result;
    }

    public SQLWriteResult write(SQLWriteRequest writeRequest) {
        String datasetName = writeRequest.getDatasetName();
        if (!BigQuerySQLEngine.class.getName().equals(writeRequest.getOutput().getSqlEngineClassName())) {
            LOG.debug("Got output for another SQL engine {}, skipping", (Object)writeRequest.getOutput().getSqlEngineClassName());
            return SQLWriteResult.unsupported((String)datasetName);
        }
        String sourceTable = this.datasets.get(writeRequest.getDatasetName()).getBigQueryTable();
        TableId sourceTableId = TableId.of((String)this.datasetProject, (String)this.datasetName, (String)sourceTable);
        BigQueryWrite bigQueryWrite = BigQueryWrite.getInstance(datasetName, this.sqlEngineConfig, this.bigQuery, writeRequest, sourceTableId, this.metrics);
        return bigQueryWrite.write();
    }

    public void cleanup(String datasetName) throws SQLEngineException {
        BigQuerySQLDataset bqDataset = this.datasets.get(datasetName);
        if (bqDataset == null) {
            return;
        }
        LOG.info("Cleaning up dataset {}", (Object)datasetName);
        SQLEngineException ex = null;
        try {
            this.cancelJob(datasetName, bqDataset);
        }
        catch (BigQueryException e) {
            LOG.error("Exception when cancelling BigQuery job '{}' for stage '{}': {}", new Object[]{bqDataset.getJobId(), datasetName, e.getMessage()});
            ex = new SQLEngineException(String.format("Exception when executing cleanup for stage '%s'", datasetName), (Throwable)e);
        }
        try {
            this.deleteTable(datasetName, bqDataset);
        }
        catch (BigQueryException e) {
            LOG.error("Exception when deleting BigQuery table '{}' for stage '{}': {}", new Object[]{bqDataset.getBigQueryTable(), datasetName, e.getMessage()});
            if (ex == null) {
                ex = new SQLEngineException(String.format("Exception when executing cleanup for stage '%s'", datasetName), (Throwable)e);
            }
            ex.addSuppressed((Throwable)e);
        }
        try {
            this.deleteTempFolder(bqDataset);
        }
        catch (IOException e) {
            LOG.error("Failed to delete temporary directory '{}' for stage '{}': {}", new Object[]{bqDataset.getGCSPath(), datasetName, e.getMessage()});
            if (ex == null) {
                ex = new SQLEngineException(String.format("Exception when executing cleanup for stage '%s'", datasetName), (Throwable)e);
            }
            ex.addSuppressed((Throwable)e);
        }
        if (ex != null) {
            throw ex;
        }
    }

    public Set<Capability> getCapabilities() {
        return Collections.singleton(StringExpressionFactoryType.SQL);
    }

    public List<ExpressionFactory<?>> getExpressionFactories() {
        return Collections.singletonList(new SQLExpressionFactory());
    }

    public Relation getRelation(SQLRelationDefinition relationDefinition) {
        LinkedHashSet<String> columnSet = new LinkedHashSet<String>();
        List fields = relationDefinition.getSchema().getFields();
        if (fields != null) {
            for (Schema.Field field : fields) {
                columnSet.add(field.getName());
            }
        }
        String datasetName = relationDefinition.getDatasetName();
        return BigQueryRelation.getInstance(datasetName, columnSet, (FeatureFlagsProvider)this.ctx);
    }

    public Engine getRelationalEngine() {
        return this;
    }

    public boolean supportsRelationalTranform() {
        return true;
    }

    public boolean supportsInputSchema(Schema schema) {
        return BigQuerySQLEngineUtils.isSupportedSchema(schema);
    }

    public boolean supportsOutputSchema(Schema schema) {
        return BigQuerySQLEngineUtils.isSupportedSchema(schema);
    }

    public Set<String> getIncludedStageNames() {
        return this.sqlEngineConfig.getIncludedStages();
    }

    public Set<String> getExcludedStageNames() {
        return this.sqlEngineConfig.getExcludedStages();
    }

    public boolean canTransform(SQLTransformDefinition transformDefinition) {
        Relation relation = transformDefinition.getOutputRelation();
        return relation instanceof BigQueryRelation && relation.isValid();
    }

    public SQLDataset transform(SQLTransformRequest context) throws SQLEngineException {
        BigQueryRelation relation = (BigQueryRelation)context.getOutputRelation();
        Map<String, BigQuerySQLDataset> bqDatasets = context.getInputDataSets().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> (BigQuerySQLDataset)e.getValue()));
        relation.setInputDatasets(bqDatasets);
        return this.executeSelect(context.getOutputDatasetName(), context.getOutputSchema(), BigQueryJobType.TRANSFORM, relation.getSQLStatement());
    }

    private BigQuerySelectDataset executeSelect(String datasetName, Schema outputSchema, BigQueryJobType jobType, String query) {
        LOG.info("Executing {} operation for dataset {}", (Object)jobType.getType(), (Object)datasetName);
        String jobId = BigQuerySQLEngineUtils.newIdentifier();
        String table = BigQuerySQLEngineUtils.getNewTableName(this.runId);
        BigQuerySQLEngineUtils.createEmptyTable(this.sqlEngineConfig, this.bigQuery, this.project, this.datasetName, table);
        BigQuerySelectDataset selectDataset = BigQuerySelectDataset.getInstance(datasetName, outputSchema, this.sqlEngineConfig, this.bigQuery, this.project, DatasetId.of((String)this.datasetProject, (String)this.datasetName), table, jobId, jobType, query, this.metrics).execute();
        this.datasets.put(datasetName, selectDataset);
        LOG.info("Executed {} operation for dataset {}", (Object)jobType.getType(), (Object)datasetName);
        return selectDataset;
    }

    protected Map<String, String> getStageNameToBQTableNameMap() {
        return this.datasets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((BigQuerySQLDataset)e.getValue()).getBigQueryTable()));
    }

    protected void cancelJob(String stageName, BigQuerySQLDataset bqDataset) throws BigQueryException {
        if (this.sqlEngineConfig.shouldRetainTables().booleanValue()) {
            return;
        }
        String jobId = bqDataset.getJobId();
        if (jobId == null) {
            return;
        }
        String tableName = bqDataset.getBigQueryTable();
        Job job = this.bigQuery.getJob(jobId, new BigQuery.JobOption[0]);
        if (job == null) {
            return;
        }
        if (!job.cancel()) {
            LOG.error("Unable to cancel BigQuery job '{}' for table '{}' and stage '{}'", new Object[]{jobId, tableName, stageName});
        }
    }

    protected void deleteTable(String stageName, BigQuerySQLDataset bqDataset) throws BigQueryException {
        if (this.sqlEngineConfig.shouldRetainTables().booleanValue()) {
            return;
        }
        String tableName = bqDataset.getBigQueryTable();
        TableId tableId = TableId.of((String)this.datasetProject, (String)this.datasetName, (String)tableName);
        if (this.bigQuery.getTable(tableId, new BigQuery.TableOption[0]) != null) {
            this.bigQuery.delete(tableId);
        }
    }

    protected void deleteTempFolder(BigQuerySQLDataset bqDataset) throws IOException {
        String gcsPath = bqDataset.getGCSPath();
        if (gcsPath == null) {
            return;
        }
        BigQueryUtil.deleteTemporaryDirectory(this.configuration, gcsPath);
    }
}

