/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.publisher;

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.common.base.Strings;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Topic;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPReferenceSinkConfig;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.publisher.PubSubOutputFormat;
import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;

@Plugin(type="batchsink")
@Name(value="GooglePublisher")
@Description(value="Writes to a Google Cloud Pub/Sub topic. Cloud Pub/Sub brings the scalability, flexibility, and reliability of enterprise message-oriented middleware to the cloud. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication between independently written applications")
public class GooglePublisher
extends BatchSink<StructuredRecord, NullWritable, StructuredRecord> {
    private final Config config;

    public GooglePublisher(Config config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        this.config.validate(collector);
    }

    public void prepareRun(BatchSinkContext context) throws IOException {
        String serviceAccount;
        FailureCollector collector = context.getFailureCollector();
        this.config.validate(collector);
        TopicAdminSettings.Builder topicAdminSettings = TopicAdminSettings.newBuilder();
        Boolean isServiceAccountFilePath = this.config.isServiceAccountFilePath();
        if (isServiceAccountFilePath == null) {
            context.getFailureCollector().addFailure("Service account type is undefined.", "Must be `filePath` or `JSON`");
            collector.getOrThrowException();
        }
        if (!Strings.isNullOrEmpty((String)(serviceAccount = this.config.getServiceAccount()))) {
            topicAdminSettings.setCredentialsProvider(() -> GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath));
        }
        String projectId = this.config.getProject();
        ProjectTopicName projectTopicName = ProjectTopicName.of((String)projectId, (String)this.config.topic);
        if (!context.isPreviewEnabled()) {
            try (TopicAdminClient topicAdminClient = TopicAdminClient.create((TopicAdminSettings)topicAdminSettings.build());){
                try {
                    topicAdminClient.getTopic(projectTopicName);
                }
                catch (NotFoundException e) {
                    try {
                        Topic.Builder request = Topic.newBuilder().setName(projectTopicName.toString());
                        CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(this.config.cmekKey, context.getArguments().asMap(), context.getFailureCollector());
                        context.getFailureCollector().getOrThrowException();
                        if (cmekKeyName != null) {
                            request.setKmsKeyName(cmekKeyName.toString());
                        }
                        topicAdminClient.createTopic(request.build());
                    }
                    catch (AlreadyExistsException request) {
                    }
                    catch (ApiException e1) {
                        throw new IOException(String.format("Could not auto-create topic '%s' in project '%s'. Please ensure it is created before running the pipeline, or ensure that the service account has permission to create the topic.", this.config.topic, projectId), e);
                    }
                }
            }
        }
        Schema inputSchema = context.getInputSchema();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, this.config.getReferenceName());
        lineageRecorder.createExternalDataset(inputSchema);
        Configuration configuration = new Configuration();
        PubSubOutputFormat.configure(configuration, this.config);
        context.addOutput(Output.of((String)this.config.getReferenceName(), (OutputFormatProvider)new SinkOutputFormatProvider(PubSubOutputFormat.class, configuration)));
        if (inputSchema != null && inputSchema.getFields() != null && !inputSchema.getFields().isEmpty()) {
            lineageRecorder.recordWrite("Write", "Wrote to Google Cloud Pub/Sub.", inputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
        }
    }

    public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, StructuredRecord>> emitter) throws Exception {
        emitter.emit((Object)new KeyValue((Object)NullWritable.get(), (Object)input));
    }

    public static class Config
    extends GCPReferenceSinkConfig {
        public static final String NAME_MESSAGE_COUNT_BATCH_SIZE = "messageCountBatchSize";
        public static final String NAME_REQUEST_THRESHOLD_KB = "requestThresholdKB";
        public static final String NAME_PUBLISH_DELAY_THRESHOLD_MILLIS = "publishDelayThresholdMillis";
        public static final String NAME_ERROR_THRESHOLD = "errorThreshold";
        public static final String NAME_RETRY_TIMEOUT_SECONDS = "retryTimeoutSeconds";
        @Description(value="Cloud Pub/Sub topic to publish records to")
        @Macro
        private String topic;
        @Macro
        @Nullable
        @Description(value="Format of the data to read. Supported formats are 'avro', 'blob', 'tsv', 'csv', 'delimited', 'json', 'parquet' and 'text'.")
        private String format;
        @Description(value="The delimiter to use if the format is 'delimited'. The delimiter will be ignored if the format is anything other than 'delimited'.")
        @Macro
        @Nullable
        private String delimiter;
        @Description(value="Maximum count of messages in a batch. The default value is 100.")
        @Macro
        @Nullable
        private Long messageCountBatchSize;
        @Description(value="Maximum size of a batch in kilo bytes. The default value is 1KB.")
        @Macro
        @Nullable
        private Long requestThresholdKB;
        @Description(value="Maximum delay in milli-seconds for publishing the batched messages. The default value is 1 ms.")
        @Macro
        @Nullable
        private Long publishDelayThresholdMillis;
        @Description(value="Maximum number of message publishing failures to tolerate per partition before the pipeline will be failed. The default value is 0.")
        @Macro
        @Nullable
        private Long errorThreshold;
        @Description(value="Maximum amount of time in seconds to spend retrying publishing failures. The default value is 30 seconds.")
        @Macro
        @Nullable
        private Integer retryTimeoutSeconds;
        @Name(value="cmekKey")
        @Macro
        @Nullable
        @Description(value="The GCP customer managed encryption key (CMEK) name used to encrypt data written to any topic created by the plugin. If the topic already exists, this is ignored. More information can be found at https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys")
        private String cmekKey;

        public Config(String referenceName, String topic, @Nullable Long messageCountBatchSize, @Nullable Long requestThresholdKB, @Nullable Long publishDelayThresholdMillis, @Nullable Long errorThreshold, @Nullable Integer retryTimeoutSeconds) {
            this.referenceName = referenceName;
            this.topic = topic;
            this.messageCountBatchSize = messageCountBatchSize;
            this.requestThresholdKB = requestThresholdKB;
            this.publishDelayThresholdMillis = publishDelayThresholdMillis;
            this.errorThreshold = errorThreshold;
            this.retryTimeoutSeconds = retryTimeoutSeconds;
        }

        @Override
        public void validate(FailureCollector collector, Map<String, String> arguments) {
            super.validate(collector, arguments);
            if (!this.containsMacro(NAME_MESSAGE_COUNT_BATCH_SIZE) && this.messageCountBatchSize != null && this.messageCountBatchSize < 1L) {
                collector.addFailure("Invalid maximum count of messages in a batch.", "Ensure the value is a positive number.").withConfigProperty(NAME_MESSAGE_COUNT_BATCH_SIZE);
            }
            if (!this.containsMacro(NAME_REQUEST_THRESHOLD_KB) && this.requestThresholdKB != null && this.requestThresholdKB < 1L) {
                collector.addFailure("Invalid maximum batch size.", "Ensure the value is a positive number.").withConfigProperty(NAME_REQUEST_THRESHOLD_KB);
            }
            if (!this.containsMacro(NAME_PUBLISH_DELAY_THRESHOLD_MILLIS) && this.publishDelayThresholdMillis != null && this.publishDelayThresholdMillis < 1L) {
                collector.addFailure("Invalid delay threshold for publishing a batch.", "Ensure the value is a positive number.").withConfigProperty(NAME_PUBLISH_DELAY_THRESHOLD_MILLIS);
            }
            if (!this.containsMacro(NAME_ERROR_THRESHOLD) && this.errorThreshold != null && this.errorThreshold < 0L) {
                collector.addFailure("Invalid error threshold for publishing.", "Ensure the value is a positive number.").withConfigProperty(NAME_ERROR_THRESHOLD);
            }
            if (!this.containsMacro(NAME_RETRY_TIMEOUT_SECONDS) && this.retryTimeoutSeconds != null && this.retryTimeoutSeconds < 1) {
                collector.addFailure("Invalid max retry timeout for retrying failed publish.", "Ensure the value is a positive number.").withConfigProperty(NAME_RETRY_TIMEOUT_SECONDS);
            }
            if (!this.containsMacro("delimiter") && !this.containsMacro("format") && this.getFormat().equalsIgnoreCase("delimited") && this.delimiter == null) {
                collector.addFailure(String.format("Delimiter is required when format is set to %s.", this.format), "Ensure the delimiter is provided.").withConfigProperty(this.delimiter);
            }
            if (!this.containsMacro("cmekKey")) {
                CmekUtils.getCmekKey(this.cmekKey, arguments, collector);
            }
            collector.getOrThrowException();
        }

        public long getRequestBytesThreshold() {
            return this.requestThresholdKB == null ? 1024L : this.requestThresholdKB * 1024L;
        }

        public long getMessageCountBatchSize() {
            return this.messageCountBatchSize == null ? 100L : this.messageCountBatchSize;
        }

        public long getPublishDelayThresholdMillis() {
            return this.publishDelayThresholdMillis == null ? 1L : this.publishDelayThresholdMillis;
        }

        public long getErrorThreshold() {
            return this.errorThreshold == null ? 0L : this.errorThreshold;
        }

        public int getRetryTimeoutSeconds() {
            return this.retryTimeoutSeconds == null ? 30 : this.retryTimeoutSeconds;
        }

        public String getTopic() {
            return this.topic;
        }

        public String getFormat() {
            return Strings.isNullOrEmpty((String)this.format) ? "text" : this.format;
        }

        public String getDelimiter() {
            return this.delimiter;
        }

        @Nullable
        public Long getRequestThresholdKB() {
            return this.requestThresholdKB;
        }
    }
}

