/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.bigquery.sqlengine;

import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.EncryptionConfiguration;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.etl.api.engine.sql.request.SQLWriteRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLWriteResult;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkConfig;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.sink.Operation;
import io.cdap.plugin.gcp.bigquery.sink.PartitionType;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngineConfig;
import io.cdap.plugin.gcp.bigquery.sqlengine.util.BigQuerySQLEngineUtils;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryWrite {
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryWrite.class);
    private static final Gson GSON = new Gson();
    public static final String SQL_OUTPUT_JOB_ID = "jobId";
    public static final String SQL_OUTPUT_CONFIG = "config";
    public static final String SQL_OUTPUT_FIELDS = "fields";
    public static final String SQL_OUTPUT_SCHEMA = "schema";
    private static final Type LIST_OF_STRINGS_TYPE = new TypeToken<ArrayList<String>>(){}.getType();
    private static final String BQ_PUSHDOWN_OPERATION_TAG = "write";
    private final BigQuerySQLEngineConfig sqlEngineConfig;
    private final BigQuery bigQuery;
    private final String datasetName;
    private final SQLWriteRequest writeRequest;
    private final TableId sourceTableId;
    private final Metrics metrics;

    private BigQueryWrite(String datasetName, BigQuerySQLEngineConfig sqlEngineConfig, BigQuery bigQuery, SQLWriteRequest writeRequest, TableId sourceTableId, Metrics metrics) {
        this.datasetName = datasetName;
        this.sqlEngineConfig = sqlEngineConfig;
        this.bigQuery = bigQuery;
        this.writeRequest = writeRequest;
        this.sourceTableId = sourceTableId;
        this.metrics = metrics;
    }

    public static BigQueryWrite getInstance(String datasetName, BigQuerySQLEngineConfig sqlEngineConfig, BigQuery bigQuery, SQLWriteRequest writeRequest, TableId sourceTableId, Metrics metrics) {
        return new BigQueryWrite(datasetName, sqlEngineConfig, bigQuery, writeRequest, sourceTableId, metrics);
    }

    public SQLWriteResult write() {
        AtomicReference<Object> newDestinationTable = new AtomicReference<Object>(null);
        try {
            return this.writeInternal(this.writeRequest, newDestinationTable);
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted exception during BigQuery write operation.", (Throwable)e);
        }
        catch (BigQueryException bqe) {
            LOG.error("BigQuery exception during BigQuery write operation", (Throwable)bqe);
        }
        catch (Exception e) {
            LOG.error("Exception during BigQuery write operation", (Throwable)e);
        }
        if (newDestinationTable.get() != null) {
            this.tryDeleteTable(newDestinationTable.get());
        }
        return SQLWriteResult.faiure((String)this.writeRequest.getDatasetName());
    }

    private SQLWriteResult writeInternal(SQLWriteRequest writeRequest, AtomicReference<TableId> newDestinationTable) throws BigQueryException, InterruptedException {
        String datasetName = writeRequest.getDatasetName();
        if (!BigQuerySQLEngine.class.getName().equals(writeRequest.getOutput().getSqlEngineClassName())) {
            LOG.debug("Got output for another SQL engine {}, skipping", (Object)writeRequest.getOutput().getSqlEngineClassName());
            return SQLWriteResult.unsupported((String)datasetName);
        }
        Map arguments = writeRequest.getOutput().getArguments();
        String jobId = (String)arguments.get(SQL_OUTPUT_JOB_ID);
        BigQuerySinkConfig sinkConfig = (BigQuerySinkConfig)((Object)GSON.fromJson((String)arguments.get(SQL_OUTPUT_CONFIG), BigQuerySinkConfig.class));
        io.cdap.cdap.api.data.schema.Schema schema = (io.cdap.cdap.api.data.schema.Schema)GSON.fromJson((String)arguments.get(SQL_OUTPUT_SCHEMA), io.cdap.cdap.api.data.schema.Schema.class);
        List fields = (List)GSON.fromJson((String)arguments.get(SQL_OUTPUT_FIELDS), LIST_OF_STRINGS_TYPE);
        String destinationProject = sinkConfig.getDatasetProject();
        String destinationDataset = sinkConfig.getDataset();
        String destinationTableName = sinkConfig.getTable();
        TableId destinationTableId = TableId.of((String)destinationProject, (String)destinationDataset, (String)destinationTableName);
        boolean allowSchemaRelaxation = sinkConfig.isAllowSchemaRelaxation();
        Operation operation = sinkConfig.getOperation();
        DatasetId sourceDatasetId = DatasetId.of((String)this.sourceTableId.getProject(), (String)this.sourceTableId.getDataset());
        DatasetId destinationDatasetId = DatasetId.of((String)destinationProject, (String)destinationDataset);
        Dataset srcDataset = this.bigQuery.getDataset(sourceDatasetId, new BigQuery.DatasetOption[0]);
        Dataset destDataset = this.bigQuery.getDataset(destinationDatasetId, new BigQuery.DatasetOption[0]);
        if (srcDataset == null || destDataset == null) {
            LOG.warn("Direct table copy is not supported when the datasets are not created.");
            return SQLWriteResult.unsupported((String)datasetName);
        }
        if (!Objects.equals(srcDataset.getLocation(), destDataset.getLocation())) {
            LOG.warn("Direct table copy is only supported if both datasets are in the same location. '{}' is '{}' , '{}' is '{}' .", new Object[]{sourceDatasetId.getDataset(), srcDataset.getLocation(), destinationDatasetId.getDataset(), destDataset.getLocation()});
            return SQLWriteResult.unsupported((String)datasetName);
        }
        if (sinkConfig.isTruncateTableSet() && operation == Operation.INSERT) {
            LOG.warn("Direct table copy is not supported for the INSERT operation when Truncate Table is enabled.");
            return SQLWriteResult.unsupported((String)datasetName);
        }
        Table srcTable = this.bigQuery.getTable(this.sourceTableId, new BigQuery.TableOption[0]);
        Table destTable = this.bigQuery.getTable(destinationTableId, new BigQuery.TableOption[0]);
        if (destTable == null && operation == Operation.UPSERT) {
            operation = Operation.INSERT;
        }
        if (destTable != null) {
            LOG.info("Destinaton table `{}.{}.{}` already exists.", new Object[]{destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable()});
            if (allowSchemaRelaxation) {
                this.relaxTableSchema(schema, destTable);
            }
        } else {
            this.createTable(schema, destinationTableId, sinkConfig, newDestinationTable);
        }
        QueryJobConfiguration.Builder queryConfigBuilder = operation == Operation.INSERT ? this.getInsertQueryJobBuilder(this.sourceTableId, destinationTableId, fields) : this.getUpdateUpsertQueryJobBuilder(this.sourceTableId, destinationTableId, fields, sinkConfig);
        QueryJobConfiguration queryConfig = queryConfigBuilder.build();
        JobId bqJobId = JobId.newBuilder().setJob(jobId).setLocation(srcDataset.getLocation()).setProject(this.sqlEngineConfig.getProject()).build();
        Job queryJob = this.bigQuery.create(JobInfo.newBuilder((JobConfiguration)queryConfig).setJobId(bqJobId).build(), new BigQuery.JobOption[0]);
        Object result = null;
        queryJob = queryJob.waitFor(new RetryOption[0]);
        JobStatistics.QueryStatistics queryJobStats = (JobStatistics.QueryStatistics)queryJob.getStatistics();
        if (queryJob.getStatus().getError() != null) {
            BigQuerySQLEngineUtils.logJobMetrics(queryJob, this.metrics);
            LOG.error("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}", new Object[]{jobId, this.sqlEngineConfig.getProject(), this.sqlEngineConfig.getDatasetProject(), queryJob.getStatus().getError().toString()});
            return SQLWriteResult.faiure((String)datasetName);
        }
        long numRows = queryJobStats != null && queryJobStats.getNumDmlAffectedRows() != null ? queryJobStats.getNumDmlAffectedRows().longValue() : srcTable.getNumRows().longValue();
        LOG.info("Executed copy operation for {} records from {}.{}.{} to {}.{}.{}", new Object[]{numRows, this.sourceTableId.getProject(), this.sourceTableId.getDataset(), this.sourceTableId.getTable(), destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable()});
        BigQuerySQLEngineUtils.logJobMetrics(queryJob, this.metrics);
        return SQLWriteResult.success((String)datasetName, (long)numRows);
    }

    protected void relaxTableSchema(io.cdap.cdap.api.data.schema.Schema schema, Table table) {
        Schema bqSchema = BigQuerySinkUtils.convertCdapSchemaToBigQuerySchema(schema);
        ArrayList<Field> fieldsToCopy = new ArrayList<Field>((Collection<Field>)bqSchema.getFields());
        FieldList destinationTableFields = table.getDefinition().getSchema().getFields();
        BigQuerySinkUtils.relaxTableSchema(this.bigQuery, table, fieldsToCopy, (List<Field>)destinationTableFields);
    }

    protected void createTable(io.cdap.cdap.api.data.schema.Schema schema, TableId tableId, BigQuerySinkConfig sinkConfig, AtomicReference<TableId> newDestinationTable) {
        Table table;
        Schema bqSchema = BigQuerySinkUtils.convertCdapSchemaToBigQuerySchema(schema);
        StandardTableDefinition.Builder tableDefinitionBuilder = StandardTableDefinition.newBuilder();
        tableDefinitionBuilder.setSchema(bqSchema);
        switch (sinkConfig.getPartitioningType()) {
            case TIME: {
                tableDefinitionBuilder.setTimePartitioning(this.getTimePartitioning(sinkConfig));
                break;
            }
            case INTEGER: {
                tableDefinitionBuilder.setRangePartitioning(this.getRangePartitioning(sinkConfig));
                break;
            }
        }
        List<String> clusteringFields = this.getClusteringOrderFields(sinkConfig);
        if (PartitionType.NONE != sinkConfig.getPartitioningType() && !clusteringFields.isEmpty()) {
            tableDefinitionBuilder.setClustering(this.getClustering(clusteringFields));
        }
        TableInfo.Builder tableInfo = TableInfo.newBuilder((TableId)tableId, (TableDefinition)tableDefinitionBuilder.build());
        if (sinkConfig.getCmekKey() != null) {
            tableInfo.setEncryptionConfiguration(this.getEncyptionConfiguration(sinkConfig));
        }
        if (sinkConfig.isPartitionFilterRequired()) {
            tableInfo.setRequirePartitionFilter(Boolean.valueOf(true));
        }
        if ((table = this.bigQuery.create(tableInfo.build(), new BigQuery.TableOption[0])) != null) {
            newDestinationTable.set(tableId);
        }
    }

    protected void tryDeleteTable(TableId table) {
        try {
            this.bigQuery.delete(table);
        }
        catch (BigQueryException bqe) {
            LOG.error("Unable to delete table {}.{}.{}. This may cause the pipeline to fail", new Object[]{table.getProject(), table.getDataset(), table.getTable(), bqe});
        }
    }

    protected QueryJobConfiguration.Builder getInsertQueryJobBuilder(TableId sourceTableId, TableId destinationTableId, List<String> fields) {
        String query = String.format("SELECT %s FROM `%s.%s.%s`", String.join((CharSequence)",", fields), sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable());
        LOG.info("Copying data from `{}.{}.{}` to `{}.{}.{}` using SQL statement: {} ", new Object[]{sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable(), query});
        return QueryJobConfiguration.newBuilder((String)query).setDestinationTable(destinationTableId).setCreateDisposition(JobInfo.CreateDisposition.CREATE_NEVER).setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND).setPriority(this.sqlEngineConfig.getJobPriority()).setLabels(BigQuerySQLEngineUtils.getJobTags(BQ_PUSHDOWN_OPERATION_TAG));
    }

    protected QueryJobConfiguration.Builder getUpdateUpsertQueryJobBuilder(TableId sourceTableId, TableId destinationTableId, List<String> fields, BigQuerySinkConfig sinkConfig) {
        String relationTableKey = sinkConfig.getRelationTableKey();
        List<String> tableKeys = Arrays.stream(relationTableKey != null ? relationTableKey.split(",") : new String[]{}).map(String::trim).collect(Collectors.toList());
        String dedupeBy = sinkConfig.getDedupeBy();
        List<String> dedupeByKeys = Arrays.stream(dedupeBy != null ? dedupeBy.split(",") : new String[]{}).map(String::trim).collect(Collectors.toList());
        String partitionFilter = sinkConfig.getPartitionFilter();
        String query = BigQuerySinkUtils.generateUpdateUpsertQuery(sinkConfig.getOperation(), sourceTableId, destinationTableId, fields, tableKeys, dedupeByKeys, partitionFilter);
        LOG.info("Copying data from `{}.{}.{}` to `{}.{}.{}` using SQL statement: {} ", new Object[]{sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable(), query});
        return QueryJobConfiguration.newBuilder((String)query).setPriority(this.sqlEngineConfig.getJobPriority()).setLabels(BigQuerySQLEngineUtils.getJobTags(BQ_PUSHDOWN_OPERATION_TAG));
    }

    protected TimePartitioning getTimePartitioning(BigQuerySinkConfig config) {
        TimePartitioning.Builder timePartitioningBuilder = TimePartitioning.newBuilder((TimePartitioning.Type)TimePartitioning.Type.DAY);
        if (config.getPartitionByField() != null) {
            timePartitioningBuilder.setField(config.getPartitionByField());
        }
        return timePartitioningBuilder.build();
    }

    protected RangePartitioning getRangePartitioning(BigQuerySinkConfig config) {
        RangePartitioning.Builder rangePartitioningBuilder = RangePartitioning.newBuilder();
        rangePartitioningBuilder.setRange(this.getRangePartitioningRange(config));
        if (config.getPartitionByField() != null) {
            rangePartitioningBuilder.setField(config.getPartitionByField());
        }
        return rangePartitioningBuilder.build();
    }

    protected RangePartitioning.Range getRangePartitioningRange(BigQuerySinkConfig config) {
        RangePartitioning.Range.Builder rangeBuilder = RangePartitioning.Range.newBuilder();
        rangeBuilder.setStart(Long.valueOf(config.getRangeStart() != null ? config.getRangeStart() : 0L));
        rangeBuilder.setEnd(Long.valueOf(config.getRangeEnd() != null ? config.getRangeEnd() : 0L));
        rangeBuilder.setInterval(Long.valueOf(config.getRangeInterval() != null ? config.getRangeInterval() : 0L));
        return rangeBuilder.build();
    }

    List<String> getClusteringOrderFields(BigQuerySinkConfig config) {
        String clusteringOrder = config.getClusteringOrder() != null ? config.getClusteringOrder() : "";
        return Arrays.stream(clusteringOrder.split(",")).map(String::trim).filter(f -> !f.isEmpty()).collect(Collectors.toList());
    }

    protected Clustering getClustering(List<String> clusteringFields) {
        Clustering.Builder clustering = Clustering.newBuilder();
        clustering.setFields(clusteringFields);
        return clustering.build();
    }

    protected EncryptionConfiguration getEncyptionConfiguration(BigQuerySinkConfig config) {
        return EncryptionConfiguration.newBuilder().setKmsKeyName(config.getCmekKey()).build();
    }
}

