/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.gcs.sink;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.IdUtils;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.ReferenceNames;
import io.cdap.plugin.format.FileFormat;
import io.cdap.plugin.format.plugin.AbstractFileSink;
import io.cdap.plugin.format.plugin.FileSinkProperties;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.Formats;
import io.cdap.plugin.gcp.gcs.GCSPath;
import io.cdap.plugin.gcp.gcs.StorageClient;
import io.cdap.plugin.gcp.gcs.sink.GCSOutputFormatProvider;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsink")
@Name(value="GCS")
@Description(value="Writes records to one or more files in a directory on Google Cloud Storage.")
@Metadata(properties={@MetadataProperty(key="connector", value="GCS")})
public class GCSBatchSink
extends AbstractFileSink<GCSBatchSinkConfig> {
    public static final String NAME = "GCS";
    private static final Logger LOG = LoggerFactory.getLogger(GCSBatchSink.class);
    public static final String RECORD_COUNT = "recordcount";
    private static final String RECORDS_UPDATED_METRIC = "records.updated";
    public static final String AVRO_NAMED_OUTPUT = "avro.mo.config.namedOutput";
    public static final String COMMON_NAMED_OUTPUT = "mapreduce.output.basename";
    public static final String CONTENT_TYPE = "io.cdap.gcs.batch.sink.content.type";
    private final GCSBatchSinkConfig config;
    private String outputPath;
    private Asset asset;

    public GCSBatchSink(GCSBatchSinkConfig config) {
        super((PluginConfig)config);
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
    }

    public ValidatingOutputFormat getValidatingOutputFormat(PipelineConfigurer pipelineConfigurer) {
        ValidatingOutputFormat delegate = super.getValidatingOutputFormat(pipelineConfigurer);
        return new GCSOutputFormatProvider(delegate);
    }

    public ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) throws InstantiationException {
        ValidatingOutputFormat outputFormatForRun = super.getOutputFormatForRun(context);
        return new GCSOutputFormatProvider(outputFormatForRun);
    }

    public void prepareRun(BatchSinkContext context) throws Exception {
        String location;
        Bucket bucket;
        FailureCollector collector = context.getFailureCollector();
        CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(this.config.cmekKey, context.getArguments().asMap(), collector);
        collector.getOrThrowException();
        Boolean isServiceAccountFilePath = this.config.connection.isServiceAccountFilePath();
        if (isServiceAccountFilePath == null) {
            collector.addFailure("Service account type is undefined.", "Must be `filePath` or `JSON`");
            collector.getOrThrowException();
            return;
        }
        GoogleCredentials credentials = this.config.connection.getServiceAccount() == null ? null : GCPUtils.loadServiceAccountCredentials(this.config.connection.getServiceAccount(), isServiceAccountFilePath);
        Storage storage = GCPUtils.getStorage(this.config.connection.getProject(), (Credentials)credentials);
        try {
            bucket = storage.get(this.config.getBucket(), new Storage.BucketGetOption[0]);
        }
        catch (StorageException e) {
            throw new RuntimeException(String.format("Unable to access or create bucket %s. ", this.config.getBucket()) + "Ensure you entered the correct bucket path and have permissions for it.", e);
        }
        if (bucket != null) {
            location = bucket.getLocation();
        } else {
            GCPUtils.createBucket(storage, this.config.getBucket(), this.config.getLocation(), cmekKeyName);
            location = this.config.getLocation();
        }
        this.outputPath = this.getOutputDir(context);
        this.asset = Asset.builder((String)this.config.getReferenceName()).setFqn(this.config.getPath()).setLocation(location).build();
        super.prepareRun(context);
    }

    protected Map<String, String> getFileSystemProperties(BatchSinkContext context) {
        Map<String, String> properties = GCPUtils.getFileSystemProperties(this.config.connection, this.config.getPath(), new HashMap<String, String>());
        properties.put(CONTENT_TYPE, this.config.getContentType());
        properties.putAll(this.config.getFileSystemProperties());
        String outputFileBaseName = this.config.getOutputFileNameBase();
        if (outputFileBaseName == null || outputFileBaseName.isEmpty()) {
            return properties;
        }
        properties.put(AVRO_NAMED_OUTPUT, outputFileBaseName);
        properties.put(COMMON_NAMED_OUTPUT, outputFileBaseName);
        return properties;
    }

    protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
        return new LineageRecorder((BatchContext)context, this.asset);
    }

    protected void recordLineage(LineageRecorder lineageRecorder, List<String> outputFields) {
        lineageRecorder.recordWrite("Write", "Wrote to Google Cloud Storage.", outputFields);
    }

    public void onRunFinish(boolean succeeded, BatchSinkContext context) {
        super.onRunFinish(succeeded, (BatchContext)context);
        this.emitMetrics(succeeded, context);
    }

    private void emitMetrics(boolean succeeded, BatchSinkContext context) {
        if (!succeeded) {
            return;
        }
        try {
            StorageClient storageClient = StorageClient.create(this.config.connection);
            storageClient.mapMetaDataForAllBlobs(this.getPrefixPath(), new MetricsEmitter(context.getMetrics())::emitMetrics);
        }
        catch (Exception e) {
            LOG.warn("Metrics for the number of affected rows in GCS Sink maybe incorrect.", (Throwable)e);
        }
    }

    private String getPrefixPath() {
        String filenameBase = this.getFilenameBase();
        if (filenameBase == null) {
            return this.outputPath;
        }
        String outputPathPrefix = this.outputPath.endsWith("/") ? this.outputPath.substring(0, this.outputPath.length() - 1) : this.outputPath;
        return String.format("%s/%s-", outputPathPrefix, filenameBase);
    }

    @Nullable
    private String getFilenameBase() {
        String outputFileBaseName = this.config.getOutputFileNameBase();
        if (outputFileBaseName != null && !outputFileBaseName.isEmpty()) {
            return outputFileBaseName;
        }
        Map<String, String> fileSystemProperties = this.config.getFileSystemProperties();
        if (fileSystemProperties.containsKey(AVRO_NAMED_OUTPUT) && FileFormat.AVRO.name().toLowerCase().equals(this.config.getFormatName())) {
            return fileSystemProperties.get(AVRO_NAMED_OUTPUT);
        }
        if (fileSystemProperties.containsKey(COMMON_NAMED_OUTPUT)) {
            return fileSystemProperties.get(COMMON_NAMED_OUTPUT);
        }
        return null;
    }

    public static class GCSBatchSinkConfig
    extends PluginConfig
    implements FileSinkProperties {
        public static final String NAME_PATH = "path";
        private static final String NAME_SUFFIX = "suffix";
        private static final String NAME_FORMAT = "format";
        private static final String NAME_SCHEMA = "schema";
        private static final String NAME_LOCATION = "location";
        private static final String NAME_FS_PROPERTIES = "fileSystemProperties";
        private static final String NAME_FILE_NAME_BASE = "outputFileNameBase";
        private static final String NAME_CONTENT_TYPE = "contentType";
        private static final String NAME_CUSTOM_CONTENT_TYPE = "customContentType";
        private static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
        private static final String CONTENT_TYPE_OTHER = "other";
        private static final String CONTENT_TYPE_APPLICATION_JSON = "application/json";
        private static final String CONTENT_TYPE_APPLICATION_AVRO = "application/avro";
        private static final String CONTENT_TYPE_APPLICATION_CSV = "application/csv";
        private static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain";
        private static final String CONTENT_TYPE_TEXT_CSV = "text/csv";
        private static final String CONTENT_TYPE_TEXT_TSV = "text/tab-separated-values";
        private static final String FORMAT_AVRO = "avro";
        private static final String FORMAT_CSV = "csv";
        private static final String FORMAT_JSON = "json";
        private static final String FORMAT_TSV = "tsv";
        private static final String FORMAT_DELIMITED = "delimited";
        private static final String FORMAT_ORC = "orc";
        private static final String FORMAT_PARQUET = "parquet";
        public static final String NAME_CMEK_KEY = "cmekKey";
        private static final String SCHEME = "gs://";
        @Name(value="path")
        @Description(value="The path to write to. For example, gs://<bucket>/path/to/directory")
        @Macro
        private String path;
        @Description(value="The time format for the output directory that will be appended to the path. For example, the format 'yyyy-MM-dd-HH-mm' will result in a directory of the form '2015-01-01-20-42'. If not specified, nothing will be appended to the path.")
        @Nullable
        @Macro
        private String suffix;
        @Macro
        @Description(value="The format to write in. The format must be one of 'json', 'avro', 'parquet', 'csv', 'tsv', or 'delimited'.")
        protected String format;
        @Description(value="The delimiter to use if the format is 'delimited'. The delimiter will be ignored if the format is anything other than 'delimited'.")
        @Macro
        @Nullable
        private String delimiter;
        @Macro
        @Nullable
        @Description(value="Whether a header should be written to each output file. This only applies to the delimited, csv, and tsv formats.")
        private Boolean writeHeader;
        @Description(value="The schema of the data to write. The 'avro' and 'parquet' formats require a schema but other formats do not.")
        @Macro
        @Nullable
        private String schema;
        @Name(value="location")
        @Macro
        @Nullable
        @Description(value="The location where the gcs bucket will get created. This value is ignored if the bucket already exists")
        protected String location;
        @Macro
        @Description(value="The Content Type property is used to indicate the media type of the resource.Defaults to 'application/octet-stream'.")
        @Nullable
        protected String contentType;
        @Macro
        @Description(value="The Custom Content Type is used when the value of Content-Type is set to other.User can provide specific Content-Type, different from the options in the dropdown.")
        @Nullable
        protected String customContentType;
        @Name(value="fileSystemProperties")
        @Macro
        @Nullable
        @Description(value="Advanced feature to specify any additional properties that should be used with the sink.")
        private String fileSystemProperties;
        @Name(value="outputFileNameBase")
        @Macro
        @Nullable
        @Description(value="Advanced feature to specify file output name prefix.")
        private String outputFileNameBase;
        @Name(value="cmekKey")
        @Macro
        @Nullable
        @Description(value="The GCP customer managed encryption key (CMEK) name used to encrypt data written to any bucket created by the plugin. If the bucket already exists, this is ignored. More information can be found at https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys")
        protected String cmekKey;
        @Name(value="referenceName")
        @Nullable
        @Description(value="This will be used to uniquely identify this source for lineage, annotating metadata, etc.")
        public String referenceName;
        @Name(value="useConnection")
        @Nullable
        @Description(value="Whether to use an existing connection.")
        private Boolean useConnection;
        @Name(value="connection")
        @Macro
        @Nullable
        @Description(value="The existing connection to use.")
        protected GCPConnectorConfig connection;

        public void validate() {
        }

        public void validate(FailureCollector collector) {
            this.validate(collector, Collections.emptyMap());
        }

        public void validate(FailureCollector collector, Map<String, String> arguments) {
            if (!Strings.isNullOrEmpty((String)this.referenceName)) {
                IdUtils.validateReferenceName((String)this.referenceName, (FailureCollector)collector);
            }
            ConfigUtil.validateConnection((PluginConfig)this, (Boolean)this.useConnection, (PluginConfig)this.connection, (FailureCollector)collector);
            if (!this.containsMacro(NAME_PATH)) {
                try {
                    GCSPath.from(this.path);
                }
                catch (IllegalArgumentException e) {
                    collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_PATH).withStacktrace(e.getStackTrace());
                }
            }
            if (this.suffix != null && !this.containsMacro(NAME_SUFFIX)) {
                try {
                    new SimpleDateFormat(this.suffix);
                }
                catch (IllegalArgumentException e) {
                    collector.addFailure("Invalid suffix.", "Ensure provided suffix is valid.").withConfigProperty(NAME_SUFFIX).withStacktrace(e.getStackTrace());
                }
            }
            if (!(this.containsMacro(NAME_CONTENT_TYPE) || this.containsMacro(NAME_CUSTOM_CONTENT_TYPE) || Strings.isNullOrEmpty((String)this.contentType) || this.contentType.equalsIgnoreCase(CONTENT_TYPE_OTHER) || this.containsMacro(NAME_FORMAT) || this.contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE))) {
                this.validateContentType(collector);
            }
            if (!this.containsMacro(NAME_CMEK_KEY)) {
                this.validateCmekKey(collector, arguments);
            }
            try {
                this.getSchema();
            }
            catch (IllegalArgumentException e) {
                collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA).withStacktrace(e.getStackTrace());
            }
            try {
                this.getFileSystemProperties();
            }
            catch (IllegalArgumentException e) {
                collector.addFailure("File system properties must be a valid json.", null).withConfigProperty(NAME_FS_PROPERTIES).withStacktrace(e.getStackTrace());
            }
        }

        public String getReferenceName() {
            return Strings.isNullOrEmpty((String)this.referenceName) ? ReferenceNames.normalizeFqn((String)this.getPath()) : this.referenceName;
        }

        public GCSBatchSinkConfig(@Nullable String referenceName, @Nullable String project, @Nullable String fileSystemProperties, @Nullable String serviceAccountType, @Nullable String serviceFilePath, @Nullable String serviceAccountJson, @Nullable String path, @Nullable String location, @Nullable String cmekKey, @Nullable String format, @Nullable String contentType, @Nullable String customContentType) {
            this.referenceName = referenceName;
            this.fileSystemProperties = fileSystemProperties;
            this.connection = new GCPConnectorConfig(project, serviceAccountType, serviceFilePath, serviceAccountJson);
            this.path = path;
            this.location = location;
            this.cmekKey = cmekKey;
            this.format = format;
            this.contentType = contentType;
            this.customContentType = customContentType;
        }

        public void validateContentType(FailureCollector failureCollector) {
            switch (this.format) {
                case "avro": {
                    if (this.contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_AVRO)) break;
                    failureCollector.addFailure(String.format("Valid content types for avro are %s, %s.", CONTENT_TYPE_APPLICATION_AVRO, DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
                    break;
                }
                case "json": {
                    if (this.contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_JSON) || this.contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) break;
                    failureCollector.addFailure(String.format("Valid content types for json are %s, %s, %s.", CONTENT_TYPE_APPLICATION_JSON, CONTENT_TYPE_TEXT_PLAIN, DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
                    break;
                }
                case "csv": {
                    if (this.contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV) || this.contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV) || this.contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) break;
                    failureCollector.addFailure(String.format("Valid content types for csv are %s, %s, %s, %s.", CONTENT_TYPE_APPLICATION_CSV, CONTENT_TYPE_TEXT_PLAIN, CONTENT_TYPE_TEXT_CSV, DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
                    break;
                }
                case "delimited": {
                    if (this.contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN) || this.contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV) || this.contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV) || this.contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) break;
                    failureCollector.addFailure(String.format("Valid content types for delimited are %s, %s, %s, %s, %s.", CONTENT_TYPE_TEXT_PLAIN, CONTENT_TYPE_TEXT_CSV, CONTENT_TYPE_APPLICATION_CSV, CONTENT_TYPE_TEXT_TSV, DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
                    break;
                }
                case "parquet": {
                    if (this.contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) break;
                    failureCollector.addFailure(String.format("Valid content type for parquet is %s.", DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
                    break;
                }
                case "orc": {
                    if (this.contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) break;
                    failureCollector.addFailure(String.format("Valid content type for orc is %s.", DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
                    break;
                }
                case "tsv": {
                    if (this.contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN) || this.contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) break;
                    failureCollector.addFailure(String.format("Valid content types for tsv are %s, %s, %s.", CONTENT_TYPE_TEXT_TSV, CONTENT_TYPE_TEXT_PLAIN, DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
                }
            }
        }

        public String getBucket() {
            return GCSPath.from(this.path).getBucket();
        }

        public String getPath() {
            GCSPath gcsPath = GCSPath.from(this.path);
            return SCHEME + gcsPath.getBucket() + gcsPath.getUri().getPath();
        }

        public String getFormatName() {
            return Formats.getFormatPluginName(this.format);
        }

        @Nullable
        public Schema getSchema() {
            if (this.containsMacro(NAME_SCHEMA) || Strings.isNullOrEmpty((String)this.schema)) {
                return null;
            }
            try {
                return Schema.parseJson((String)this.schema);
            }
            catch (IOException e) {
                throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
            }
        }

        @Nullable
        public String getSuffix() {
            return this.suffix;
        }

        @Nullable
        public String getDelimiter() {
            return this.delimiter;
        }

        @Nullable
        public String getLocation() {
            return this.location;
        }

        @Nullable
        public String getContentType() {
            if (!Strings.isNullOrEmpty((String)this.contentType)) {
                if (this.contentType.equals(CONTENT_TYPE_OTHER)) {
                    if (Strings.isNullOrEmpty((String)this.customContentType)) {
                        return DEFAULT_CONTENT_TYPE;
                    }
                    return this.customContentType;
                }
                return this.contentType;
            }
            return DEFAULT_CONTENT_TYPE;
        }

        public Map<String, String> getFileSystemProperties() {
            if (this.fileSystemProperties == null || this.fileSystemProperties.isEmpty()) {
                return Collections.emptyMap();
            }
            try {
                return (Map)new Gson().fromJson(this.fileSystemProperties, new TypeToken<Map<String, String>>(){}.getType());
            }
            catch (JsonSyntaxException e) {
                throw new IllegalArgumentException("Unable to parse filesystem properties: " + e.getMessage(), e);
            }
        }

        @Nullable
        public String getOutputFileNameBase() {
            return this.outputFileNameBase;
        }

        public GCSBatchSinkConfig() {
        }

        void validateCmekKey(FailureCollector failureCollector, Map<String, String> arguments) {
            CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(this.cmekKey, arguments, failureCollector);
            if (cmekKeyName == null || this.containsMacro(NAME_PATH) || this.containsMacro(NAME_LOCATION) || this.connection == null || !this.connection.canConnect()) {
                return;
            }
            Storage storage = GCPUtils.getStorage(this.connection.getProject(), this.connection.getCredentials(failureCollector));
            if (storage == null) {
                return;
            }
            CmekUtils.validateCmekKeyAndBucketLocation(storage, GCSPath.from(this.path), cmekKeyName, this.location, failureCollector);
        }

        public static Builder builder() {
            return new Builder();
        }

        public static class Builder {
            private String referenceName;
            private String serviceAccountType;
            private String serviceFilePath;
            private String serviceAccountJson;
            private String fileSystemProperties;
            private String project;
            private String gcsPath;
            private String cmekKey;
            private String location;
            private String format;
            private String contentType;
            private String customContentType;

            public Builder setReferenceName(@Nullable String referenceName) {
                this.referenceName = referenceName;
                return this;
            }

            public Builder setProject(@Nullable String project) {
                this.project = project;
                return this;
            }

            public Builder setServiceAccountType(@Nullable String serviceAccountType) {
                this.serviceAccountType = serviceAccountType;
                return this;
            }

            public Builder setServiceFilePath(@Nullable String serviceFilePath) {
                this.serviceFilePath = serviceFilePath;
                return this;
            }

            public Builder setServiceAccountJson(@Nullable String serviceAccountJson) {
                this.serviceAccountJson = serviceAccountJson;
                return this;
            }

            public Builder setGcsPath(@Nullable String gcsPath) {
                this.gcsPath = gcsPath;
                return this;
            }

            public Builder setCmekKey(@Nullable String cmekKey) {
                this.cmekKey = cmekKey;
                return this;
            }

            public Builder setLocation(@Nullable String location) {
                this.location = location;
                return this;
            }

            public Builder setFileSystemProperties(@Nullable String fileSystemProperties) {
                this.fileSystemProperties = fileSystemProperties;
                return this;
            }

            public Builder setFormat(@Nullable String format) {
                this.format = format;
                return this;
            }

            public Builder setContentType(@Nullable String contentType) {
                this.contentType = contentType;
                return this;
            }

            public Builder setCustomContentType(@Nullable String customContentType) {
                this.customContentType = customContentType;
                return this;
            }

            public GCSBatchSinkConfig build() {
                return new GCSBatchSinkConfig(this.referenceName, this.project, this.fileSystemProperties, this.serviceAccountType, this.serviceFilePath, this.serviceAccountJson, this.gcsPath, this.location, this.cmekKey, this.format, this.contentType, this.customContentType);
            }
        }
    }

    private static class MetricsEmitter {
        private StageMetrics stageMetrics;

        private MetricsEmitter(StageMetrics stageMetrics) {
            this.stageMetrics = stageMetrics;
        }

        public void emitMetrics(Map<String, String> metaData) {
            long totalRows = this.extractRecordCount(metaData);
            if (totalRows == 0L) {
                return;
            }
            long count = totalRows / Integer.MAX_VALUE;
            int cap = 10000;
            if (count > (long)cap) {
                LOG.warn("Total record count is too high! Metric for the number of affected rows may not be updated correctly");
            }
            count = count < (long)cap ? count : (long)cap;
            int i = 0;
            while ((long)i <= count && totalRows > 0L) {
                int rowCount = totalRows < Integer.MAX_VALUE ? (int)totalRows : Integer.MAX_VALUE;
                this.stageMetrics.count(GCSBatchSink.RECORDS_UPDATED_METRIC, rowCount);
                totalRows -= (long)rowCount;
                ++i;
            }
        }

        private long extractRecordCount(Map<String, String> metadata) {
            String value = metadata.get(GCSBatchSink.RECORD_COUNT);
            return value == null ? 0L : Long.parseLong(value);
        }
    }
}

