/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.bigquery.sink;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineOutput;
import io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySink;
import io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySinkConfig;
import io.cdap.plugin.gcp.bigquery.sink.BigQueryOutputFormatProvider;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkConfig;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.sink.Operation;
import io.cdap.plugin.gcp.bigquery.sink.PartitionType;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsink")
@Name(value="BigQueryTable")
@Description(value="This sink writes to a BigQuery table. BigQuery is Google's serverless, highly scalable, enterprise data warehouse. Data is first written to a temporary location on Google Cloud Storage, then loaded into BigQuery from there.")
@Metadata(properties={@MetadataProperty(key="connector", value="BigQuery")})
public final class BigQuerySink
extends AbstractBigQuerySink {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySink.class);
    private static final Gson GSON = new Gson();
    public static final String NAME = "BigQueryTable";
    private final BigQuerySinkConfig config;
    private final String jobId = UUID.randomUUID().toString();

    public BigQuerySink(BigQuerySinkConfig config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        io.cdap.cdap.api.data.schema.Schema schema;
        super.configurePipeline(pipelineConfigurer);
        StageConfigurer configurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector collector = configurer.getFailureCollector();
        io.cdap.cdap.api.data.schema.Schema inputSchema = configurer.getInputSchema();
        io.cdap.cdap.api.data.schema.Schema configuredSchema = this.config.getSchema(collector);
        this.config.validate(inputSchema, configuredSchema, collector, Collections.emptyMap());
        if (this.config.connection == null || this.config.tryGetProject() == null || this.config.getServiceAccountType() == null || this.config.isServiceAccountFilePath().booleanValue() && this.config.connection.autoServiceAccountUnavailable()) {
            return;
        }
        io.cdap.cdap.api.data.schema.Schema schema2 = schema = configuredSchema == null ? inputSchema : configuredSchema;
        if (schema != null) {
            this.validateConfiguredSchema(schema, collector);
        }
    }

    @Override
    protected BigQuerySinkConfig getConfig() {
        return this.config;
    }

    @Override
    protected void prepareRunValidation(BatchSinkContext context) {
        FailureCollector collector = context.getFailureCollector();
        this.config.validate(context.getInputSchema(), this.config.getSchema(collector), collector, context.getArguments().asMap());
        collector.getOrThrowException();
    }

    @Override
    protected void prepareRunInternal(BatchSinkContext context, BigQuery bigQuery, String bucket) throws IOException {
        FailureCollector collector = context.getFailureCollector();
        io.cdap.cdap.api.data.schema.Schema configSchema = this.config.getSchema(collector);
        io.cdap.cdap.api.data.schema.Schema outputSchema = configSchema == null ? context.getInputSchema() : configSchema;
        this.configureTable(outputSchema);
        this.configureBigQuerySink();
        this.initOutput(context, bigQuery, this.config.getReferenceName(), BigQueryUtil.getFQN(this.config.getDatasetProject(), this.config.getDataset(), this.config.getTable()), this.config.getTable(), outputSchema, bucket, collector, null);
        this.initSQLEngineOutput(context, bigQuery, this.config.getReferenceName(), context.getStageName(), this.config.getTable(), outputSchema, collector);
    }

    @Override
    public void onRunFinish(boolean succeeded, BatchSinkContext context) {
        super.onRunFinish(succeeded, context);
        try {
            this.recordMetric(succeeded, context);
        }
        catch (Exception exception) {
            LOG.warn("Exception while trying to emit metric. No metric will be emitted for the number of affected rows.", (Throwable)exception);
        }
    }

    void initSQLEngineOutput(BatchSinkContext context, BigQuery bigQuery, String outputName, String stageName, String tableName, @Nullable io.cdap.cdap.api.data.schema.Schema tableSchema, FailureCollector collector) {
        if (tableSchema == null) {
            LOG.debug("BigQuery SQL Engine Output was not initialized. Schema was empty.");
            return;
        }
        List<BigQueryTableFieldSchema> fields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, tableName, tableSchema, this.getConfig().isAllowSchemaRelaxation(), this.config.getDatasetProject(), this.config.getDataset(), this.config.isTruncateTableSet(), collector);
        List fieldNames = fields.stream().map(BigQueryTableFieldSchema::getName).collect(Collectors.toList());
        ImmutableMap.Builder arguments = new ImmutableMap.Builder();
        arguments.put((Object)"jobId", (Object)(this.jobId + "_write")).put((Object)"config", (Object)GSON.toJson((Object)this.config)).put((Object)"schema", (Object)GSON.toJson((Object)tableSchema)).put((Object)"fields", (Object)GSON.toJson(fieldNames));
        context.addOutput((Output)new SQLEngineOutput(outputName, stageName, BigQuerySQLEngine.class.getName(), (Map)arguments.build()));
    }

    void recordMetric(boolean succeeded, BatchSinkContext context) {
        Job queryJob;
        if (!succeeded) {
            return;
        }
        JobId bqJobId = this.getJobId();
        Job job = queryJob = bqJobId != null ? this.bigQuery.getJob(bqJobId, new BigQuery.JobOption[0]) : null;
        if (queryJob == null) {
            LOG.warn("Unable to find BigQuery job. No metric will be emitted for the number of affected rows.");
            return;
        }
        long totalRows = this.getTotalRows(queryJob);
        LOG.info("Job {} affected {} rows", (Object)queryJob.getJobId(), (Object)totalRows);
        int cap = 10000;
        long count = totalRows / Integer.MAX_VALUE;
        if (count > (long)cap) {
            LOG.warn("Total record count is too high! Metric for the number of affected rows may not be updated correctly");
        }
        count = count < (long)cap ? count : (long)cap;
        int i = 0;
        while ((long)i <= count && totalRows > 0L) {
            int rowCount = totalRows < Integer.MAX_VALUE ? (int)totalRows : Integer.MAX_VALUE;
            context.getMetrics().count("records.updated", rowCount);
            totalRows -= (long)rowCount;
            ++i;
        }
        ImmutableMap tags = new ImmutableMap.Builder().put((Object)"aet", (Object)"batchsink").put((Object)"tpe", (Object)NAME).build();
        long totalBytes = this.getTotalBytes(queryJob);
        context.getMetrics().child((Map)tags).countLong("bytes.processed", totalBytes);
    }

    @Nullable
    private JobId getJobId() {
        BigQuerySinkConfig config = this.getConfig();
        DatasetId datasetId = DatasetId.of((String)config.getDatasetProject(), (String)config.getDataset());
        Dataset dataset = this.bigQuery.getDataset(datasetId, new BigQuery.DatasetOption[0]);
        if (dataset == null) {
            LOG.warn("Dataset {} was not found in project {}", (Object)config.getDataset(), (Object)config.getDatasetProject());
            return null;
        }
        String location = dataset.getLocation();
        JobId id = JobId.newBuilder().setLocation(location).setJob(this.jobId).build();
        Job job = this.bigQuery.getJob(id, new BigQuery.JobOption[0]);
        if (job == null) {
            LOG.warn("Job {} was not found in location {}", (Object)this.jobId, (Object)location);
            return null;
        }
        return id;
    }

    private long getTotalRows(Job queryJob) {
        JobConfiguration.Type type = queryJob.getConfiguration().getType();
        if (type == JobConfiguration.Type.LOAD) {
            return ((JobStatistics.LoadStatistics)queryJob.getStatistics()).getOutputRows();
        }
        if (type == JobConfiguration.Type.QUERY) {
            return ((JobStatistics.QueryStatistics)queryJob.getStatistics()).getNumDmlAffectedRows();
        }
        LOG.warn("Unable to identify BigQuery job type. No metric will be emitted for the number of affected rows.");
        return 0L;
    }

    private long getTotalBytes(Job queryJob) {
        JobConfiguration.Type type = queryJob.getConfiguration().getType();
        if (type == JobConfiguration.Type.LOAD) {
            long outputBytes = ((JobStatistics.LoadStatistics)queryJob.getStatistics()).getOutputBytes();
            LOG.info("Job {} loaded {} bytes", (Object)queryJob.getJobId(), (Object)outputBytes);
            return outputBytes;
        }
        if (type == JobConfiguration.Type.QUERY) {
            long processedBytes = ((JobStatistics.QueryStatistics)queryJob.getStatistics()).getTotalBytesProcessed();
            LOG.info("Job {} processed {} bytes", (Object)queryJob.getJobId(), (Object)processedBytes);
            return processedBytes;
        }
        LOG.warn("Unable to identify BigQuery job type. No metric will be emitted for the number of affected bytes.");
        return 0L;
    }

    @Override
    protected OutputFormatProvider getOutputFormatProvider(Configuration configuration, String tableName, io.cdap.cdap.api.data.schema.Schema tableSchema) {
        return new BigQueryOutputFormatProvider(configuration, tableSchema);
    }

    private void configureBigQuerySink() {
        this.baseConfiguration.set("cdap.bq.sink.job.id", this.jobId);
        if (this.config.getPartitionByField() != null) {
            this.baseConfiguration.set("cdap.bq.sink.partition.by.field", this.getConfig().getPartitionByField());
        }
        this.baseConfiguration.setBoolean("cdap.bq.sink.require.partition.filter", this.getConfig().isPartitionFilterRequired());
        if (this.config.getClusteringOrder() != null) {
            this.baseConfiguration.set("cdap.bq.sink.clustering.order", this.getConfig().getClusteringOrder());
        }
        this.baseConfiguration.set("cdap.bq.sink.operation", this.getConfig().getOperation().name());
        if (this.config.getRelationTableKey() != null) {
            this.baseConfiguration.set("cdap.bq.sink.table.key", this.getConfig().getRelationTableKey());
        }
        if (this.config.getDedupeBy() != null) {
            this.baseConfiguration.set("cdap.bq.sink.dedupe.by", this.getConfig().getDedupeBy());
        }
        if (this.config.getPartitionFilter() != null) {
            this.baseConfiguration.set("cdap.bq.sink.partition.filter", this.getConfig().getPartitionFilter());
        }
        PartitionType partitioningType = this.getConfig().getPartitioningType();
        this.baseConfiguration.setEnum("cdap.bq.sink.partition.type", (Enum)partitioningType);
        if (this.config.getRangeStart() != null) {
            this.baseConfiguration.setLong("cdap.bq.sink.partition.integer.range.start", this.config.getRangeStart().longValue());
        }
        if (this.config.getRangeEnd() != null) {
            this.baseConfiguration.setLong("cdap.bq.sink.partition.integer.range.end", this.config.getRangeEnd().longValue());
        }
        if (this.config.getRangeInterval() != null) {
            this.baseConfiguration.setLong("cdap.bq.sink.partition.integer.range.interval", this.config.getRangeInterval().longValue());
        }
    }

    private void configureTable(io.cdap.cdap.api.data.schema.Schema schema) {
        BigQuerySinkConfig config = this.getConfig();
        Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), ((AbstractBigQuerySinkConfig)config).getTable(), config.getServiceAccount(), config.isServiceAccountFilePath());
        this.baseConfiguration.setBoolean("cdap.bq.sink.destination.table.exists", table != null);
        List tableFieldsNames = null;
        if (table != null) {
            tableFieldsNames = Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream().map(Field::getName).collect(Collectors.toList());
        } else if (schema != null) {
            tableFieldsNames = schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList());
        }
        if (tableFieldsNames != null) {
            this.baseConfiguration.set("cdap.bq.sink.table.fields", String.join((CharSequence)",", tableFieldsNames));
        }
    }

    private void validateConfiguredSchema(io.cdap.cdap.api.data.schema.Schema schema, FailureCollector collector) {
        if (!this.config.shouldConnect()) {
            return;
        }
        this.validateRecordDepth(schema, collector);
        String tableName = this.config.getTable();
        Table table = BigQueryUtil.getBigQueryTable(this.config.getDatasetProject(), this.config.getDataset(), tableName, this.config.getServiceAccount(), this.config.isServiceAccountFilePath(), collector);
        if (table != null && !this.config.containsMacro("allowSchemaRelaxation")) {
            Schema bqSchema = table.getDefinition().getSchema();
            if (this.config.getOperation().equals((Object)Operation.INSERT)) {
                BigQuerySinkUtils.validateInsertSchema(table, schema, this.config.allowSchemaRelaxation, this.config.isTruncateTableSet(), this.config.getDataset(), collector);
            } else if (this.config.getOperation().equals((Object)Operation.UPSERT)) {
                BigQuerySinkUtils.validateSchema(tableName, bqSchema, schema, this.config.allowSchemaRelaxation, this.config.isTruncateTableSet(), this.config.getDataset(), collector);
            }
        }
    }
}

