/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.gcs.sink;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
import io.cdap.cdap.api.plugin.InvalidPluginProperty;
import io.cdap.cdap.api.plugin.PluginProperties;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.format.FileFormat;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.sink.DelegatingGCSOutputFormat;
import io.cdap.plugin.gcp.gcs.sink.GCSBatchSink;
import io.cdap.plugin.gcp.gcs.sink.RecordFilterOutputFormat;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsink")
@Name(value="GCSMultiFiles")
@Description(value="Writes records to one or more Avro, ORC, Parquet or Delimited format files in a directory on Google Cloud Storage.")
@Metadata(properties={@MetadataProperty(key="connector", value="GCS")})
public class GCSMultiBatchSink
extends BatchSink<StructuredRecord, NullWritable, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(GCSMultiBatchSink.class);
    public static final String NAME = "GCSMultiFiles";
    private static final String TABLE_PREFIX = "multisink.";
    private static final String FORMAT_PLUGIN_ID = "format";
    private static final String SCHEMA_MACRO = "__provided_schema__";
    private final GCSMultiBatchSinkConfig config;

    public GCSMultiBatchSink(GCSMultiBatchSinkConfig config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        this.config.validate(collector);
        collector.getOrThrowException();
        PluginProperties.Builder formatPropertiesBuilder = PluginProperties.builder().addAll(this.config.getProperties().getProperties());
        if (!this.config.getAllowFlexibleSchema().booleanValue()) {
            formatPropertiesBuilder.add("schema", String.format("${%s}", SCHEMA_MACRO));
        }
        PluginProperties formatProperties = formatPropertiesBuilder.build();
        if (!this.config.containsMacro(FORMAT_PLUGIN_ID)) {
            String format = this.config.getFormatName();
            OutputFormatProvider outputFormatProvider = (OutputFormatProvider)pipelineConfigurer.usePlugin("validatingOutputFormat", format, FORMAT_PLUGIN_ID, formatProperties);
            if (outputFormatProvider == null) {
                collector.addFailure(String.format("Could not find the '%s' output format plugin.", format), null).withPluginNotFound(FORMAT_PLUGIN_ID, format, "validatingOutputFormat");
            }
            return;
        }
        for (FileFormat f : FileFormat.values()) {
            try {
                pipelineConfigurer.usePlugin("validatingOutputFormat", f.name().toLowerCase(), f.name().toLowerCase(), this.config.getRawProperties());
            }
            catch (InvalidPluginConfigException e) {
                LOG.warn("Failed to register format '{}', which means it cannot be used when the pipeline is run. Missing properties: {}, invalid properties: {}", new Object[]{f.name(), e.getMissingProperties(), e.getInvalidProperties().stream().map(InvalidPluginProperty::getName).collect(Collectors.toList())});
            }
        }
    }

    public void prepareRun(BatchSinkContext context) throws IOException, InstantiationException {
        FailureCollector collector = context.getFailureCollector();
        this.config.validate(collector, context.getArguments().asMap());
        collector.getOrThrowException();
        Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(this.config.connection, this.config.getPath(), new HashMap<String, String>());
        HashMap<String, String> argumentCopy = new HashMap<String, String>(context.getArguments().asMap());
        CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(this.config.cmekKey, context.getArguments().asMap(), collector);
        collector.getOrThrowException();
        Boolean isServiceAccountFilePath = this.config.connection.isServiceAccountFilePath();
        if (isServiceAccountFilePath == null) {
            context.getFailureCollector().addFailure("Service account type is undefined.", "Must be `filePath` or `JSON`");
            context.getFailureCollector().getOrThrowException();
            return;
        }
        GoogleCredentials credentials = this.config.connection.getServiceAccount() == null ? null : GCPUtils.loadServiceAccountCredentials(this.config.connection.getServiceAccount(), isServiceAccountFilePath);
        Storage storage = GCPUtils.getStorage(this.config.connection.getProject(), (Credentials)credentials);
        try {
            if (storage.get(this.config.getBucket(), new Storage.BucketGetOption[0]) == null) {
                GCPUtils.createBucket(storage, this.config.getBucket(), this.config.getLocation(), cmekKeyName);
            }
        }
        catch (StorageException e) {
            throw new RuntimeException(String.format("Unable to access or create bucket %s. ", this.config.getBucket()) + "Ensure you entered the correct bucket path and have permissions for it.", e);
        }
        if (this.config.getAllowFlexibleSchema().booleanValue()) {
            this.configureSchemalessMultiSink(context, baseProperties, argumentCopy);
        } else {
            this.configureMultiSinkWithSchema(context, baseProperties, argumentCopy);
        }
    }

    public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, StructuredRecord>> emitter) {
        emitter.emit((Object)new KeyValue((Object)NullWritable.get(), (Object)input));
    }

    private void configureMultiSinkWithSchema(BatchSinkContext context, Map<String, String> baseProperties, Map<String, String> argumentCopy) throws IOException, InstantiationException {
        for (Map.Entry<String, String> argument : argumentCopy.entrySet()) {
            String key = argument.getKey();
            if (!key.startsWith(TABLE_PREFIX)) continue;
            String name = key.substring(TABLE_PREFIX.length());
            Schema schema = Schema.parseJson((String)argument.getValue());
            context.getArguments().set(SCHEMA_MACRO, schema.toString());
            ValidatingOutputFormat validatingOutputFormat = (ValidatingOutputFormat)context.newPluginInstance(FORMAT_PLUGIN_ID);
            HashMap<String, String> outputProperties = new HashMap<String, String>(baseProperties);
            outputProperties.putAll(validatingOutputFormat.getOutputFormatConfiguration());
            outputProperties.putAll(RecordFilterOutputFormat.configure(validatingOutputFormat.getOutputFormatClassName(), this.config.splitField, name, schema));
            outputProperties.put("mapreduce.output.fileoutputformat.outputdir", this.config.getOutputDir(context.getLogicalStartTime(), name));
            outputProperties.put("io.cdap.gcs.batch.sink.content.type", this.config.getContentType());
            context.addOutput(Output.of((String)(this.config.getReferenceName() + "_" + name), (OutputFormatProvider)new SinkOutputFormatProvider(RecordFilterOutputFormat.class.getName(), outputProperties)));
        }
    }

    private void configureSchemalessMultiSink(BatchSinkContext context, Map<String, String> baseProperties, Map<String, String> argumentCopy) throws InstantiationException {
        ValidatingOutputFormat validatingOutputFormat = (ValidatingOutputFormat)context.newPluginInstance(FORMAT_PLUGIN_ID);
        HashMap<String, String> outputProperties = new HashMap<String, String>(baseProperties);
        outputProperties.putAll(validatingOutputFormat.getOutputFormatConfiguration());
        outputProperties.putAll(DelegatingGCSOutputFormat.configure(validatingOutputFormat.getOutputFormatClassName(), this.config.splitField, this.config.getOutputBaseDir(), this.config.getOutputSuffix(context.getLogicalStartTime())));
        outputProperties.put("io.cdap.gcs.batch.sink.content.type", this.config.getContentType());
        context.addOutput(Output.of((String)this.config.getReferenceName(), (OutputFormatProvider)new SinkOutputFormatProvider(DelegatingGCSOutputFormat.class.getName(), outputProperties)));
    }

    public static class GCSMultiBatchSinkConfig
    extends GCSBatchSink.GCSBatchSinkConfig {
        private static final String NAME_ALLOW_FLEXIBLE_SCHEMA = "allowFlexibleSchema";
        @Description(value="The codec to use when writing data. The 'avro' format supports 'snappy' and 'deflate'. The parquet format supports 'snappy' and 'gzip'. Other formats do not support compression.")
        @Nullable
        private String compressionCodec;
        @Description(value="The name of the field that will be used to determine which directory to write to.")
        private String splitField = "tablename";
        @Name(value="allowFlexibleSchema")
        @Macro
        @Nullable
        @Description(value="Allow Flexible Schemas in output. If disabled, only records with schemas set as arguments will be processed. If enabled, all records will be written as-is.")
        private Boolean allowFlexibleSchema;

        protected String getOutputDir(long logicalStartTime, String context) {
            return String.format("%s/%s/%s", this.getOutputBaseDir(), context, this.getOutputSuffix(logicalStartTime));
        }

        protected String getOutputBaseDir() {
            return this.getPath();
        }

        protected String getOutputSuffix(long logicalStartTime) {
            boolean suffixOk = !Strings.isNullOrEmpty((String)this.getSuffix());
            return suffixOk ? new SimpleDateFormat(this.getSuffix()).format(logicalStartTime) : "";
        }

        public Boolean getAllowFlexibleSchema() {
            return this.allowFlexibleSchema != null ? this.allowFlexibleSchema : false;
        }
    }
}

