/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.dataplex.sink;

import com.google.api.gax.rpc.ApiException;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.Table;
import com.google.cloud.dataplex.v1.AssetName;
import com.google.cloud.dataplex.v1.CreateEntityRequest;
import com.google.cloud.dataplex.v1.DataplexServiceClient;
import com.google.cloud.dataplex.v1.Entity;
import com.google.cloud.dataplex.v1.EntityName;
import com.google.cloud.dataplex.v1.GetEntityRequest;
import com.google.cloud.dataplex.v1.MetadataServiceClient;
import com.google.cloud.dataplex.v1.Schema;
import com.google.cloud.dataplex.v1.StorageFormat;
import com.google.cloud.dataplex.v1.StorageSystem;
import com.google.cloud.dataplex.v1.UpdateEntityRequest;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
import io.cdap.cdap.api.plugin.InvalidPluginProperty;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.validation.FormatContext;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.format.FileFormat;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.sink.PartitionType;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.dataplex.common.util.DataplexUtil;
import io.cdap.plugin.gcp.dataplex.sink.DataplexOutputFormatProvider;
import io.cdap.plugin.gcp.dataplex.sink.config.DataplexBatchSinkConfig;
import io.cdap.plugin.gcp.gcs.StorageClient;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsink")
@Name(value="Dataplex")
@Description(value="Ingests and processes data within Dataplex.")
public final class DataplexBatchSink
extends BatchSink<StructuredRecord, Object, Object> {
    public static final String NAME = "Dataplex";
    private static final Logger LOG = LoggerFactory.getLogger(DataplexBatchSink.class);
    private static final String RECORDS_UPDATED_METRIC = "records.updated";
    private final DataplexBatchSinkConfig config;
    private final UUID runUUID = UUID.randomUUID();
    protected Configuration baseConfiguration;
    protected BigQuery bigQuery;
    private String outputPath;
    private com.google.cloud.dataplex.v1.Asset asset;
    private Entity entityBean = null;

    public DataplexBatchSink(DataplexBatchSinkConfig config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        StageConfigurer configurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector collector = configurer.getFailureCollector();
        GoogleCredentials credentials = this.config.validateAndGetServiceAccountCredentials(collector);
        try (DataplexServiceClient dataplexServiceClient = DataplexUtil.getDataplexServiceClient(credentials);){
            if (!this.config.getConnection().canConnect() || this.config.getServiceAccountType() == null || this.config.isServiceAccountFilePath().booleanValue() && this.config.autoServiceAccountUnavailable() || this.config.tryGetProject() == null) {
                return;
            }
            io.cdap.cdap.api.data.schema.Schema inputSchema = configurer.getInputSchema();
            io.cdap.cdap.api.data.schema.Schema configuredSchema = this.config.getSchema(collector);
            this.config.validateAssetConfiguration(collector, dataplexServiceClient);
            if (this.config.getAssetType().equals("BIGQUERY_DATASET")) {
                this.config.validateBigQueryDataset(inputSchema, configuredSchema, collector, dataplexServiceClient);
                return;
            }
            if (this.config.getAssetType().equals("STORAGE_BUCKET")) {
                this.config.validateStorageBucket(collector);
                this.config.validateFormatForStorageBucket(pipelineConfigurer, collector);
                if (this.config.isUpdateDataplexMetadata().booleanValue()) {
                    this.prepareDataplexMetadataUpdate(collector, configuredSchema);
                }
                return;
            }
        }
        catch (IOException e) {
            collector.addFailure(e.getMessage(), null);
        }
    }

    public void prepareRun(BatchSinkContext context) throws Exception {
        FailureCollector collector = context.getFailureCollector();
        GoogleCredentials credentials = this.config.validateAndGetServiceAccountCredentials(collector);
        try (DataplexServiceClient dataplexServiceClient = DataplexUtil.getDataplexServiceClient(credentials);){
            this.config.validateAssetConfiguration(collector, dataplexServiceClient);
            this.asset = dataplexServiceClient.getAsset(AssetName.newBuilder().setProject(this.config.tryGetProject()).setLocation(this.config.getLocation()).setLake(this.config.getLake()).setZone(this.config.getZone()).setAsset(this.config.getAsset()).build());
            if (this.config.getAssetType().equals("BIGQUERY_DATASET")) {
                this.config.validateBigQueryDataset(context.getInputSchema(), context.getOutputSchema(), collector, dataplexServiceClient);
                this.prepareRunBigQueryDataset(context);
            }
            if (this.config.getAssetType().equals("STORAGE_BUCKET")) {
                this.config.validateStorageBucket(collector);
                if (this.config.isUpdateDataplexMetadata().booleanValue()) {
                    this.prepareDataplexMetadataUpdate(collector, this.config.getSchema(collector));
                }
                this.prepareRunStorageBucket(context);
            }
        }
    }

    public void transform(StructuredRecord input, Emitter<KeyValue<Object, Object>> emitter) {
        if (this.config.getAssetType().equalsIgnoreCase("BIGQUERY_DATASET")) {
            emitter.emit((Object)new KeyValue((Object)input, (Object)NullWritable.get()));
        } else {
            emitter.emit((Object)new KeyValue((Object)NullWritable.get(), (Object)input));
        }
    }

    public void onRunFinish(boolean succeeded, BatchSinkContext context) {
        if (this.config.getAssetType().equalsIgnoreCase("STORAGE_BUCKET")) {
            this.emitMetricsForStorageBucket(succeeded, context);
            if (succeeded && this.config.isUpdateDataplexMetadata().booleanValue()) {
                FailureCollector collector = context.getFailureCollector();
                GoogleCredentials googleCredentials = this.config.validateAndGetServiceAccountCredentials(collector);
                io.cdap.cdap.api.data.schema.Schema schema = this.config.getSchema(collector);
                if (schema == null) {
                    schema = context.getInputSchema();
                }
                String bucketName = "";
                try {
                    bucketName = this.asset.getResourceSpec().getName();
                }
                catch (StorageException e) {
                    throw new RuntimeException("Unable to read bucket name. See error details for more information ", e);
                }
                try (DataplexServiceClient dataplexServiceClient = DataplexUtil.getDataplexServiceClient(googleCredentials);){
                    String assetFullPath = "gs://" + bucketName + "/" + this.config.getTable();
                    this.configureDataplexMetadataUpdate(googleCredentials, assetFullPath, StorageSystem.CLOUD_STORAGE, schema);
                }
                catch (ApiException | IOException e) {
                    throw new RuntimeException(String.format("Unable create entity for bucket %s. ", bucketName) + "See error details for more information.", e);
                }
            }
            return;
        }
        Path gcsPath = new Path("gs://" + this.runUUID);
        try {
            FileSystem fs = gcsPath.getFileSystem(this.baseConfiguration);
            if (fs.exists(gcsPath)) {
                fs.delete(gcsPath, true);
                LOG.debug("Deleted temporary directory '{}'", (Object)gcsPath);
            }
            this.emitMetricsForBigQueryDataset(succeeded, context);
        }
        catch (IOException e) {
            LOG.warn("Failed to delete temporary directory '{}': {}", (Object)gcsPath, (Object)e.getMessage());
        }
        catch (Exception exception) {
            LOG.warn("Exception while trying to emit metric. No metric will be emitted for the number of affected rows.", (Throwable)exception);
        }
    }

    private void prepareRunBigQueryDataset(BatchSinkContext context) throws Exception {
        io.cdap.cdap.api.data.schema.Schema configSchema;
        FailureCollector collector = context.getFailureCollector();
        GoogleCredentials credentials = this.config.getCredentials(collector);
        String project = this.config.getProject();
        String cmekKey = context.getArguments().get("gcp.cmek.key.name");
        CryptoKeyName cmekKeyName = null;
        if (!Strings.isNullOrEmpty((String)cmekKey)) {
            cmekKeyName = CryptoKeyName.parse((String)cmekKey);
        }
        this.baseConfiguration = this.getBaseConfiguration(cmekKeyName);
        String[] assetValues = this.asset.getResourceSpec().getName().split("/");
        String datasetName = assetValues[assetValues.length - 1];
        String datasetProject = assetValues[assetValues.length - 3];
        this.bigQuery = GCPUtils.getBigQuery(datasetProject, (Credentials)credentials);
        DatasetId datasetId = DatasetId.of((String)datasetProject, (String)datasetName);
        Dataset dataset = this.bigQuery.getDataset(datasetId, new BigQuery.DatasetOption[0]);
        String bucket = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), this.config.getLocation(), dataset, null);
        String fallbackBucketName = "dataplex-" + this.runUUID;
        bucket = BigQuerySinkUtils.configureBucket(this.baseConfiguration, bucket, fallbackBucketName);
        if (!context.isPreviewEnabled()) {
            BigQuerySinkUtils.createResources(this.bigQuery, GCPUtils.getStorage(project, (Credentials)credentials), DatasetId.of((String)datasetProject, (String)datasetName), bucket, this.config.getLocation(), cmekKeyName);
        }
        io.cdap.cdap.api.data.schema.Schema outputSchema = (configSchema = this.config.getSchema(collector)) == null ? context.getInputSchema() : configSchema;
        this.configureTable(outputSchema, datasetName, datasetProject, collector);
        this.configureBigQuerySink();
        this.initOutput(context, this.bigQuery, this.config.getReferenceName(BigQueryUtil.getFQN(datasetProject, datasetName, this.config.getTable())), this.config.getTable(), outputSchema, bucket, collector, datasetName, datasetProject);
    }

    private void configureBigQuerySink() {
        this.baseConfiguration.set("cdap.bq.sink.job.id", this.runUUID.toString());
        if (this.config.getPartitionByField() != null) {
            this.baseConfiguration.set("cdap.bq.sink.partition.by.field", this.config.getPartitionByField());
        }
        this.baseConfiguration.setBoolean("cdap.bq.sink.require.partition.filter", this.config.isRequirePartitionField().booleanValue());
        if (this.config.getClusteringOrder() != null) {
            this.baseConfiguration.set("cdap.bq.sink.clustering.order", this.config.getClusteringOrder());
        }
        this.baseConfiguration.set("cdap.bq.sink.operation", this.config.getOperation().name());
        if (this.config.getTableKey() != null) {
            this.baseConfiguration.set("cdap.bq.sink.table.key", this.config.getTableKey());
        }
        if (this.config.getDedupeBy() != null) {
            this.baseConfiguration.set("cdap.bq.sink.dedupe.by", this.config.getDedupeBy());
        }
        if (this.config.getPartitionFilter() != null) {
            this.baseConfiguration.set("cdap.bq.sink.partition.filter", this.config.getPartitionFilter());
        }
        PartitionType partitioningType = this.config.getPartitioningType();
        this.baseConfiguration.setEnum("cdap.bq.sink.partition.type", (Enum)partitioningType);
        if (this.config.getRangeStart() != null) {
            this.baseConfiguration.setLong("cdap.bq.sink.partition.integer.range.start", this.config.getRangeStart().longValue());
        }
        if (this.config.getRangeEnd() != null) {
            this.baseConfiguration.setLong("cdap.bq.sink.partition.integer.range.end", this.config.getRangeEnd().longValue());
        }
        if (this.config.getRangeInterval() != null) {
            this.baseConfiguration.setLong("cdap.bq.sink.partition.integer.range.interval", this.config.getRangeInterval().longValue());
        }
    }

    private void configureTable(io.cdap.cdap.api.data.schema.Schema schema, String dataset, String datasetProject, FailureCollector collector) {
        Table table = BigQueryUtil.getBigQueryTable(datasetProject, dataset, this.config.getTable(), this.config.getServiceAccount(), this.config.isServiceAccountFilePath(), collector);
        this.baseConfiguration.setBoolean("cdap.bq.sink.destination.table.exists", table != null);
        List tableFieldsNames = null;
        if (table != null) {
            tableFieldsNames = Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream().map(Field::getName).collect(Collectors.toList());
        } else if (schema != null) {
            tableFieldsNames = schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList());
        }
        if (tableFieldsNames != null) {
            this.baseConfiguration.set("cdap.bq.sink.table.fields", String.join((CharSequence)",", tableFieldsNames));
        }
    }

    private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKey) throws IOException {
        Configuration baseConfiguration = BigQueryUtil.getBigQueryConfig(this.config.getServiceAccount(), this.config.getProject(), cmekKey, this.config.getServiceAccountType());
        baseConfiguration.setBoolean("cdap.bq.sink.allow.schema.relaxation", this.config.isUpdateTableSchema().booleanValue());
        baseConfiguration.setStrings("mapred.bq.output.table.writedisposition", new String[]{this.config.getWriteDisposition().name()});
        String gcsChunkSize = "8388608";
        baseConfiguration.set("fs.gs.outputstream.upload.chunk.size", gcsChunkSize);
        return baseConfiguration;
    }

    protected void initOutput(BatchSinkContext context, BigQuery bigQuery, String outputName, String tableName, @Nullable io.cdap.cdap.api.data.schema.Schema tableSchema, String bucket, FailureCollector collector, String dataset, String datasetProject) throws IOException {
        LOG.debug("Init output for table '{}' with schema: {}", (Object)tableName, (Object)tableSchema);
        List<BigQueryTableFieldSchema> fields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, tableName, tableSchema, this.config.isUpdateTableSchema(), datasetProject, dataset, this.config.isTruncateTable(), collector);
        Configuration configuration = new Configuration(this.baseConfiguration);
        DatasetId datasetId = DatasetId.of((String)datasetProject, (String)dataset);
        String temporaryGcsPath = BigQuerySinkUtils.getTemporaryGcsPath(bucket, this.runUUID.toString(), tableName);
        BigQuerySinkUtils.configureOutput(configuration, datasetId, tableName, temporaryGcsPath, fields);
        List<String> fieldNames = fields.stream().map(BigQueryTableFieldSchema::getName).collect(Collectors.toList());
        String fqn = BigQueryUtil.getFQN(datasetProject, dataset, this.config.getTable());
        String location = bigQuery.getDataset(datasetId, new BigQuery.DatasetOption[0]).getLocation();
        Asset lineageAsset = Asset.builder((String)this.config.getReferenceName(fqn)).setFqn(fqn).setLocation(location).build();
        BigQuerySinkUtils.recordLineage(context, lineageAsset, tableSchema, fieldNames, null);
        configuration.set("dataplexsink.assettype", "BIGQUERY_DATASET");
        context.addOutput(Output.of((String)outputName, (OutputFormatProvider)new DataplexOutputFormatProvider(configuration, tableSchema, null)));
    }

    void emitMetricsForBigQueryDataset(boolean succeeded, BatchSinkContext context) {
        if (!succeeded) {
            return;
        }
        Job queryJob = this.bigQuery.getJob(this.getJobId(), new BigQuery.JobOption[0]);
        if (queryJob == null) {
            LOG.warn("Unable to find BigQuery job. No metric will be emitted for the number of affected rows.");
            return;
        }
        long totalRows = this.getTotalRows(queryJob);
        LOG.info("Job {} affected {} rows", (Object)queryJob.getJobId(), (Object)totalRows);
        int cap = 10000;
        long count = totalRows / Integer.MAX_VALUE;
        if (count > (long)cap) {
            LOG.warn("Total record count is too high! Metric for the number of affected rows may not be updated correctly");
        }
        count = count < (long)cap ? count : (long)cap;
        int i = 0;
        while ((long)i <= count && totalRows > 0L) {
            int rowCount = totalRows < Integer.MAX_VALUE ? (int)totalRows : Integer.MAX_VALUE;
            context.getMetrics().count(RECORDS_UPDATED_METRIC, rowCount);
            totalRows -= (long)rowCount;
            ++i;
        }
    }

    private JobId getJobId() {
        return JobId.newBuilder().setLocation(this.config.getLocation()).setJob(this.runUUID.toString()).build();
    }

    private long getTotalRows(Job queryJob) {
        JobConfiguration.Type type = queryJob.getConfiguration().getType();
        if (type == JobConfiguration.Type.LOAD) {
            return ((JobStatistics.LoadStatistics)queryJob.getStatistics()).getOutputRows();
        }
        if (type == JobConfiguration.Type.QUERY) {
            return ((JobStatistics.QueryStatistics)queryJob.getStatistics()).getNumDmlAffectedRows();
        }
        LOG.warn("Unable to identify BigQuery job type. No metric will be emitted for the number of affected rows.");
        return 0L;
    }

    private void prepareRunStorageBucket(BatchSinkContext context) throws Exception {
        Bucket bucket;
        ValidatingOutputFormat validatingOutputFormat = this.validateOutputFormatForRun(context);
        FailureCollector collector = context.getFailureCollector();
        String cmekKey = context.getArguments().get("gcp.cmek.key.name");
        CryptoKeyName cmekKeyName = null;
        if (!Strings.isNullOrEmpty((String)cmekKey)) {
            cmekKeyName = CryptoKeyName.parse((String)cmekKey);
        }
        GoogleCredentials credentials = this.config.getCredentials(collector);
        Storage storage = GCPUtils.getStorage(this.config.getProject(), (Credentials)credentials);
        String bucketName = "";
        try {
            bucketName = this.asset.getResourceSpec().getName();
            bucket = storage.get(bucketName, new Storage.BucketGetOption[0]);
        }
        catch (StorageException e) {
            throw new RuntimeException(String.format("Unable to access or create bucket %s. ", bucketName) + "Ensure you entered the correct bucket path and have permissions for it.", e);
        }
        if (bucket == null) {
            GCPUtils.createBucket(storage, bucketName, this.config.getLocation(), cmekKeyName);
        }
        String outputDir = this.getOutputDir(context.getLogicalStartTime());
        Map<String, String> outputProperties = this.getStorageBucketOutputProperties(validatingOutputFormat, outputDir);
        io.cdap.cdap.api.data.schema.Schema schema = this.config.getSchema(collector);
        if (schema == null) {
            schema = context.getInputSchema();
        }
        Asset asset = Asset.builder((String)this.config.getReferenceName(outputDir)).setFqn(outputDir).setLocation(this.config.getLocation()).build();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, asset);
        lineageRecorder.createExternalDataset(schema);
        if (schema != null && schema.getFields() != null && !schema.getFields().isEmpty()) {
            this.recordLineage(lineageRecorder, schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
        }
        context.addOutput(Output.of((String)this.config.getReferenceName(outputDir), (OutputFormatProvider)new SinkOutputFormatProvider(validatingOutputFormat.getOutputFormatClassName(), outputProperties)));
    }

    private ValidatingOutputFormat validateOutputFormatForRun(BatchSinkContext context) throws Exception {
        FailureCollector collector = context.getFailureCollector();
        String format = this.config.getFormat().toString().toLowerCase(Locale.ROOT);
        ValidatingOutputFormat validatingOutputFormat = this.getOutputFormatForRun(context);
        FormatContext formatContext = new FormatContext(collector, context.getInputSchema());
        this.config.validateOutputFormatProvider(formatContext, format, validatingOutputFormat);
        collector.getOrThrowException();
        return validatingOutputFormat;
    }

    protected Map<String, String> getFileSystemProperties() {
        Map<String, String> properties = GCPUtils.getFileSystemProperties(this.config.getConnection(), this.outputPath, new HashMap<String, String>());
        properties.put("io.cdap.gcs.batch.sink.content.type", this.config.getContentType(this.config.getFormat().toString()));
        return properties;
    }

    protected Map<String, String> getStorageBucketOutputProperties(ValidatingOutputFormat validatingOutputFormat, String outputDir) {
        HashMap<String, String> outputProperties = new HashMap<String, String>(validatingOutputFormat.getOutputFormatConfiguration());
        outputProperties.put("mapreduce.output.fileoutputformat.outputdir", outputDir);
        outputProperties.put("dataplex.output.fileoutputformat.baseoutputdir", outputDir);
        outputProperties.put("dataplexsink.assettype", this.config.getAssetType());
        outputProperties.putAll(this.getFileSystemProperties());
        outputProperties.put("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false");
        if (this.config.getFormat().equals((Object)FileFormat.PARQUET)) {
            outputProperties.put("parquet.enable.summary-metadata", "false");
        }
        return outputProperties;
    }

    protected ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) throws InstantiationException {
        String fileFormat = this.config.getFormat().toString().toLowerCase();
        try {
            ValidatingOutputFormat validatingOutputFormat = (ValidatingOutputFormat)context.newPluginInstance(fileFormat);
            return new DataplexOutputFormatProvider(null, null, validatingOutputFormat);
        }
        catch (InvalidPluginConfigException e) {
            HashSet<String> properties = new HashSet<String>(e.getMissingProperties());
            for (InvalidPluginProperty invalidProperty : e.getInvalidProperties()) {
                properties.add(invalidProperty.getName());
            }
            String errorMessage = String.format("Format '%s' cannot be used because properties %s were not provided or were invalid when the pipeline was deployed. Set the format to a different value, or re-create the pipeline with all required properties.", fileFormat, properties);
            throw new IllegalArgumentException(errorMessage, e);
        }
    }

    protected void recordLineage(LineageRecorder lineageRecorder, List<String> outputFields) {
        lineageRecorder.recordWrite("Write", "Wrote to Google Cloud Storage.", outputFields);
    }

    protected String getOutputDir(long logicalStartTime) {
        String finalPath;
        String suffix = this.config.getSuffix();
        String defaultTimestampFormat = "yyyy-MM-dd-HH-mm";
        String tableName = this.config.getTable();
        suffix = Strings.isNullOrEmpty((String)suffix) ? defaultTimestampFormat : suffix;
        String timeSuffix = String.format("%s=%s", "ts", new SimpleDateFormat(suffix).format(logicalStartTime));
        String configPath = "gs://" + this.asset.getResourceSpec().getName();
        this.outputPath = finalPath = String.format("%s/%s/%s/", configPath, tableName, timeSuffix);
        return finalPath;
    }

    private void emitMetricsForStorageBucket(boolean succeeded, BatchSinkContext context) {
        if (!succeeded) {
            return;
        }
        try {
            StorageClient storageClient = StorageClient.create(this.config.getProject(), this.config.getServiceAccount(), this.config.isServiceAccountFilePath());
            storageClient.mapMetaDataForAllBlobs(this.outputPath, new MetricsEmitter(context.getMetrics())::emitMetrics);
        }
        catch (Exception e) {
            LOG.warn("Metrics for the number of affected rows in GCS Sink maybe incorrect.", (Throwable)e);
        }
    }

    private void prepareDataplexMetadataUpdate(FailureCollector collector, io.cdap.cdap.api.data.schema.Schema schema) throws IOException {
        block16: {
            Optional<Schema.Field> partitionKey = Objects.requireNonNull(schema.getFields()).stream().filter(avroField -> avroField.getName().equals("ts")).findAny();
            if (partitionKey.isPresent()) {
                collector.addFailure(String.format("Field '%s' is used by dataplex sink to create time partitioned layout on GCS. To avoid conflict, presence of a column with the name '%s' on the input schema is not allowed.", "ts", "ts"), String.format("Remove '%s' field from the output schema or rename the '%s' field in the input schema by adding a transform step.", "ts", "ts"));
            }
            String entityID = this.config.getTable().replaceAll("[^a-zA-Z0-9_]", "_");
            try (MetadataServiceClient metadataServiceClient = DataplexUtil.getMetadataServiceClient(this.config.getCredentials(collector));){
                this.entityBean = metadataServiceClient.getEntity(GetEntityRequest.newBuilder().setName(EntityName.of((String)this.config.tryGetProject(), (String)this.config.getLocation(), (String)this.config.getLake(), (String)this.config.getZone(), (String)entityID).toString()).setView(GetEntityRequest.EntityView.FULL).build());
            }
            catch (ApiException e) {
                int statusCode = e.getStatusCode().getCode().getHttpStatusCode();
                if (statusCode == 404) break block16;
                collector.addFailure("Unable to fetch entity information.", null);
            }
        }
        if (this.entityBean != null && !this.entityBean.getSchema().getUserManaged()) {
            collector.addFailure("Entity already exists, but the schema is not user-managed.", null);
        }
    }

    private void configureDataplexMetadataUpdate(GoogleCredentials credentials, String assetFullPath, StorageSystem storageSystem, io.cdap.cdap.api.data.schema.Schema schema) throws IOException {
        String entityID = this.config.getTable().replaceAll("[^a-zA-Z0-9_]", "_");
        try (MetadataServiceClient metadataServiceClient = DataplexUtil.getMetadataServiceClient(credentials);){
            Schema dataplexSchema = DataplexUtil.getDataplexSchema(schema);
            Entity.Builder entityBuilder = Entity.newBuilder().setId(entityID).setAsset(this.config.getAsset()).setDataPath(assetFullPath).setType(Entity.Type.TABLE).setSystem(storageSystem).setSchema(dataplexSchema).setFormat(StorageFormat.newBuilder().setMimeType(DataplexUtil.getStorageFormatForEntity(this.config.getFormatStr())).build());
            if (this.entityBean != null) {
                try {
                    this.entityBean = metadataServiceClient.updateEntity(UpdateEntityRequest.newBuilder().setEntity(entityBuilder.setName(this.entityBean.getName()).setEtag(this.entityBean.getEtag()).build()).build());
                }
                catch (ApiException e) {
                    throw new RuntimeException(String.format("%s: %s", "There was a problem updating the entity for metadata updates.", e.getMessage()));
                }
            }
            try {
                String entityParent = "projects/" + this.config.tryGetProject() + "/locations/" + this.config.getLocation() + "/lakes/" + this.config.getLake() + "/zones/" + this.config.getZone();
                this.entityBean = metadataServiceClient.createEntity(CreateEntityRequest.newBuilder().setParent(entityParent).setEntity(entityBuilder.build()).build());
            }
            catch (ApiException e) {
                throw new RuntimeException(String.format("%s: %s", "There was a problem creating the entity for metadata updates.", e.getMessage()));
            }
            try {
                DataplexUtil.addPartitionInfo(this.entityBean, credentials, this.asset.getResourceSpec().getName(), this.config.getTable(), this.config.getProject());
            }
            catch (ApiException e) {
                String errMessage = e.getMessage().substring(e.getMessage().length() - 14);
                if (!errMessage.equals("already exists")) {
                    throw new RuntimeException(String.format("Unable to create add partition information for %s. ", entityID));
                }
            }
        }
    }

    private static class MetricsEmitter {
        private final StageMetrics stageMetrics;

        private MetricsEmitter(StageMetrics stageMetrics) {
            this.stageMetrics = stageMetrics;
        }

        public void emitMetrics(Map<String, String> metaData) {
            long totalRows = this.extractRecordCount(metaData);
            if (totalRows == 0L) {
                return;
            }
            long count = totalRows / Integer.MAX_VALUE;
            int cap = 10000;
            if (count > (long)cap) {
                LOG.warn("Total record count is too high! Metric for the number of affected rows may not be updated correctly");
            }
            count = count < (long)cap ? count : (long)cap;
            int i = 0;
            while ((long)i <= count && totalRows > 0L) {
                int rowCount = totalRows < Integer.MAX_VALUE ? (int)totalRows : Integer.MAX_VALUE;
                this.stageMetrics.count(DataplexBatchSink.RECORDS_UPDATED_METRIC, rowCount);
                totalRows -= (long)rowCount;
                ++i;
            }
        }

        private long extractRecordCount(Map<String, String> metadata) {
            String value = metadata.get("recordcount");
            return value == null ? 0L : Long.parseLong(value);
        }
    }
}

