/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.spanner.sink;

import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.spanner.Instance;
import com.google.cloud.spanner.InstanceConfig;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ReplicaInfo;
import com.google.cloud.spanner.Spanner;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.spanner.common.SpannerUtil;
import io.cdap.plugin.gcp.spanner.sink.RecordToMutationTransformer;
import io.cdap.plugin.gcp.spanner.sink.SpannerOutputFormat;
import io.cdap.plugin.gcp.spanner.sink.SpannerSinkConfig;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsink")
@Name(value="Spanner")
@Description(value="Batch sink to write to Cloud Spanner. Cloud Spanner is a fully managed, mission-critical, relational database service that offers transactional consistency at global scale, schemas, SQL (ANSI 2011 with extensions), and automatic, synchronous replication for high availability.")
@Metadata(properties={@MetadataProperty(key="connector", value="Spanner")})
public final class SpannerSink
extends BatchSink<StructuredRecord, NullWritable, Mutation> {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerSink.class);
    public static final String NAME = "Spanner";
    private static final String TABLE_NAME = "tablename";
    private final SpannerSinkConfig config;
    private RecordToMutationTransformer transformer;

    public SpannerSink(SpannerSinkConfig config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        this.config.validate(collector);
    }

    public void prepareRun(BatchSinkContext context) throws Exception {
        FailureCollector collector = context.getFailureCollector();
        this.config.validate(collector, context.getArguments().asMap());
        collector.getOrThrowException();
        Schema configuredSchema = this.config.getSchema(collector);
        Schema schema = configuredSchema == null ? context.getInputSchema() : configuredSchema;
        CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(this.config.cmekKey, context.getArguments().asMap(), context.getFailureCollector());
        Configuration configuration = new Configuration();
        configuration.setBoolean("is.preview.enabled", context.isPreviewEnabled());
        String location = null;
        if (cmekKeyName != null) {
            configuration.set("cmek.key", cmekKeyName.toString());
        }
        try (Spanner spanner = SpannerUtil.getSpannerService(this.config.connection.getServiceAccount(), this.config.connection.isServiceAccountFilePath(), this.config.connection.getProject());){
            Instance spannerInstance = spanner.getInstanceAdminClient().getInstance(this.config.getInstance());
            InstanceConfig instanceConfig = spanner.getInstanceAdminClient().getInstanceConfig(spannerInstance.getInstanceConfigId().getInstanceConfig());
            ReplicaInfo replica = (ReplicaInfo)instanceConfig.getReplicas().get(0);
            if (replica != null) {
                location = replica.getLocation();
            }
        }
        Asset asset = Asset.builder((String)this.config.getReferenceName()).setFqn(this.config.getFQN()).setLocation(location).build();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, asset);
        lineageRecorder.createExternalDataset(schema);
        SpannerOutputFormat.configure(configuration, this.config, schema);
        context.addOutput(Output.of((String)this.config.getReferenceName(), (OutputFormatProvider)new SinkOutputFormatProvider(SpannerOutputFormat.class, configuration)));
        List fields = schema.getFields();
        if (fields != null && !fields.isEmpty()) {
            lineageRecorder.recordWrite("Write", "Wrote to Spanner table.", fields.stream().map(Schema.Field::getName).collect(Collectors.toList()));
        }
    }

    public void initialize(BatchRuntimeContext context) throws Exception {
        super.initialize(context);
        FailureCollector collector = context.getFailureCollector();
        this.config.validate(collector);
        collector.getOrThrowException();
        this.transformer = new RecordToMutationTransformer(this.config.getTable(), this.config.getSchema(collector));
    }

    public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, Mutation>> emitter) {
        Mutation mutation = this.transformer.transform(input);
        emitter.emit((Object)new KeyValue(null, (Object)mutation));
    }

    public void destroy() {
        super.destroy();
    }
}

