/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.bigquery.sink;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.common.base.Strings;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySinkConfig;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBigQuerySink
extends BatchSink<StructuredRecord, StructuredRecord, NullWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractBigQuerySink.class);
    private static final String gcsPathFormat = "gs://%s/%s";
    public static final String RECORDS_UPDATED_METRIC = "records.updated";
    private final UUID runUUID = UUID.randomUUID();
    protected Configuration baseConfiguration;
    protected BigQuery bigQuery;

    public final void prepareRun(BatchSinkContext context) throws Exception {
        this.prepareRunValidation(context);
        AbstractBigQuerySinkConfig config = this.getConfig();
        String serviceAccount = config.getServiceAccount();
        GoogleCredentials credentials = serviceAccount == null ? null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
        String project = config.getProject();
        this.bigQuery = GCPUtils.getBigQuery(project, (Credentials)credentials);
        FailureCollector collector = context.getFailureCollector();
        CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
        collector.getOrThrowException();
        this.baseConfiguration = this.getBaseConfiguration(cmekKeyName);
        DatasetId datasetId = DatasetId.of((String)config.getDatasetProject(), (String)config.getDataset());
        Dataset dataset = this.bigQuery.getDataset(datasetId, new BigQuery.DatasetOption[0]);
        Storage storage = GCPUtils.getStorage(project, (Credentials)credentials);
        String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(), dataset, config.getBucket());
        bucketName = BigQuerySinkUtils.configureBucket(this.baseConfiguration, bucketName, this.runUUID.toString());
        Bucket bucket = storage.get(bucketName, new Storage.BucketGetOption[0]);
        if (!context.isPreviewEnabled()) {
            BigQuerySinkUtils.createResources(this.bigQuery, dataset, datasetId, storage, bucket, bucketName, config.getLocation(), cmekKeyName);
        }
        this.prepareRunInternal(context, this.bigQuery, bucketName);
    }

    public void onRunFinish(boolean succeeded, BatchSinkContext context) {
        String bucket = this.getConfig().getBucket();
        String gcsPath = bucket == null ? String.format("gs://%s", this.runUUID.toString()) : String.format(gcsPathFormat, bucket, this.runUUID.toString());
        try {
            BigQueryUtil.deleteTemporaryDirectory(this.baseConfiguration, gcsPath);
        }
        catch (IOException e) {
            LOG.warn("Failed to delete temporary directory '{}': {}", (Object)gcsPath, (Object)e.getMessage());
        }
    }

    public void transform(StructuredRecord input, Emitter<KeyValue<StructuredRecord, NullWritable>> emitter) {
        emitter.emit((Object)new KeyValue((Object)input, (Object)NullWritable.get()));
    }

    protected final void initOutput(BatchSinkContext context, BigQuery bigQuery, String outputName, String fqn, String tableName, @Nullable Schema tableSchema, String bucket, FailureCollector collector, @Nullable String marker) throws IOException {
        LOG.debug("Init output for table '{}' with schema: {}", (Object)tableName, (Object)tableSchema);
        List<BigQueryTableFieldSchema> fields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, tableName, tableSchema, this.getConfig().isAllowSchemaRelaxation(), this.getConfig().getDatasetProject(), this.getConfig().getDataset(), this.getConfig().isTruncateTableSet(), collector);
        Configuration configuration = new Configuration(this.baseConfiguration);
        String temporaryGcsPath = BigQuerySinkUtils.getTemporaryGcsPath(bucket, this.runUUID.toString(), tableName);
        BigQuerySinkUtils.configureOutput(configuration, DatasetId.of((String)this.getConfig().getDatasetProject(), (String)this.getConfig().getDataset()), tableName, temporaryGcsPath, fields);
        List<String> fieldNames = fields.stream().map(BigQueryTableFieldSchema::getName).collect(Collectors.toList());
        DatasetId datasetId = DatasetId.of((String)this.getConfig().getDatasetProject(), (String)this.getConfig().getDataset());
        Dataset dataset = bigQuery.getDataset(datasetId, new BigQuery.DatasetOption[0]);
        String location = dataset != null ? dataset.getLocation() : this.getConfig().getLocation();
        Asset asset = marker == null ? Asset.builder((String)outputName).setFqn(fqn).setLocation(location).build() : Asset.builder((String)outputName).setFqn(fqn).setLocation(location).setMarker(marker).build();
        BigQuerySinkUtils.recordLineage(context, asset, tableSchema, fieldNames, tableName);
        context.addOutput(Output.of((String)outputName, (OutputFormatProvider)this.getOutputFormatProvider(configuration, tableName, tableSchema)));
    }

    protected abstract AbstractBigQuerySinkConfig getConfig();

    protected abstract void prepareRunValidation(BatchSinkContext var1);

    protected abstract void prepareRunInternal(BatchSinkContext var1, BigQuery var2, String var3) throws IOException;

    protected abstract OutputFormatProvider getOutputFormatProvider(Configuration var1, String var2, Schema var3);

    private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKeyName) throws IOException {
        AbstractBigQuerySinkConfig config = this.getConfig();
        Configuration baseConfiguration = BigQueryUtil.getBigQueryConfig(config.getServiceAccount(), config.getProject(), cmekKeyName, config.getServiceAccountType());
        baseConfiguration.setBoolean("cdap.bq.sink.allow.schema.relaxation", config.isAllowSchemaRelaxation());
        baseConfiguration.setStrings("mapred.bq.output.table.writedisposition", new String[]{config.getWriteDisposition().name()});
        String gcsChunkSize = "8388608";
        if (!Strings.isNullOrEmpty((String)config.getGcsChunkSize())) {
            gcsChunkSize = config.getGcsChunkSize();
        }
        baseConfiguration.set("fs.gs.outputstream.upload.chunk.size", gcsChunkSize);
        return baseConfiguration;
    }

    protected void validateRecordDepth(@Nullable Schema schema, FailureCollector collector) {
        this.validateRecordDepth(schema, collector, 0, null);
    }

    private void validateRecordDepth(@Nullable Schema schema, FailureCollector collector, int depth, String prefix) {
        List fields;
        if (schema == null) {
            return;
        }
        if (prefix == null) {
            prefix = "";
        }
        if ((fields = schema.getFields()) == null) {
            return;
        }
        for (Schema.Field field : fields) {
            String fieldName = prefix + field.getName();
            if (depth == 15) {
                collector.addFailure(String.format("Field '%s' exceeds BigQuery maximum allowed depth of %d.", fieldName, 15), "Please flatten the schema to contain fewer levels.");
                continue;
            }
            Schema fieldSchema = BigQueryUtil.getNonNullableSchema(field.getSchema());
            switch (fieldSchema.getType()) {
                case RECORD: {
                    this.validateRecordDepth(fieldSchema, collector, depth + 1, fieldName + ".");
                    break;
                }
                case ARRAY: {
                    if (fieldSchema.getComponentSchema() == null) break;
                    Schema componentSchema = BigQueryUtil.getNonNullableSchema(fieldSchema.getComponentSchema());
                    if (componentSchema.getType() == Schema.Type.RECORD) {
                        this.validateRecordDepth(componentSchema, collector, depth + 1, fieldName + ".");
                        break;
                    }
                    this.validateRecordDepth(componentSchema, collector, depth, fieldName + ".");
                }
            }
        }
    }

    protected Configuration getOutputConfiguration() throws IOException {
        Configuration configuration = new Configuration(this.baseConfiguration);
        return configuration;
    }
}

