/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.salesforce.plugin.source.streaming;

import com.google.common.base.Strings;
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.soap.partner.sobject.SObject;
import com.sforce.ws.ConnectionException;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.DatasetManagementException;
import io.cdap.cdap.api.dataset.DatasetProperties;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.cdap.etl.api.streaming.StreamingSource;
import io.cdap.cdap.etl.api.streaming.StreamingSourceContext;
import io.cdap.plugin.common.IdUtils;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.streaming.SalesforceStreamingSourceConfig;
import io.cdap.plugin.salesforce.plugin.source.streaming.SalesforceStreamingSourceUtil;
import java.util.List;
import java.util.stream.Collectors;
import javax.ws.rs.Path;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.tephra.TransactionFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="streamingsource")
@Name(value="Salesforce")
@Description(value="Streams data updates from Salesforce using Salesforce Streaming API")
public class SalesforceStreamingSource
extends StreamingSource<StructuredRecord> {
    static final String NAME = "Salesforce";
    static final String DESCRIPTION = "Streams data updates from Salesforce using Salesforce Streaming API";
    private static final Logger LOG = LoggerFactory.getLogger(SalesforceStreamingSource.class);
    private SalesforceStreamingSourceConfig config;

    public SalesforceStreamingSource(SalesforceStreamingSourceConfig config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        IdUtils.validateReferenceName((String)this.config.referenceName, (FailureCollector)collector);
        pipelineConfigurer.createDataset(this.config.referenceName, "externalDataset", DatasetProperties.EMPTY);
        try {
            this.config.validate(collector);
            this.config.ensurePushTopicExistAndWithCorrectFields();
            String query = this.config.getQuery();
            if (!Strings.isNullOrEmpty((String)query) && !this.config.containsMacro("pushTopicQuery") && !this.config.containsMacro("sObjectName") && this.config.canAttemptToEstablishConnection()) {
                Schema schema = SalesforceSchemaUtil.getSchema(this.config.getAuthenticatorCredentials(), SObjectDescriptor.fromQuery(query));
                pipelineConfigurer.getStageConfigurer().setOutputSchema(schema);
            }
        }
        catch (ConnectionException e) {
            collector.addFailure("There was issue communicating with Salesforce: " + e.getMessage(), null).withStacktrace(e.getStackTrace());
        }
    }

    public void prepareRun(StreamingSourceContext context) throws Exception {
        Schema schema = context.getInputSchema();
        if (schema != null && schema.getFields() != null) {
            this.recordLineage(context, this.config.referenceName, schema, "Read", String.format("Read from Salesforce Stream with push topic of %s.", this.config.getPushTopicName()));
        }
    }

    public JavaDStream<StructuredRecord> getStream(StreamingContext streamingContext) throws ConnectionException {
        FailureCollector collector = streamingContext.getFailureCollector();
        this.config.validate(collector);
        collector.getOrThrowException();
        return SalesforceStreamingSourceUtil.getStructuredRecordJavaDStream(streamingContext, this.config);
    }

    @Path(value="outputSchema")
    public Schema outputSchema(SalesforceStreamingSourceConfig config) throws Exception {
        AuthenticatorCredentials authenticatorCredentials = config.getAuthenticatorCredentials();
        PartnerConnection partnerConnection = new PartnerConnection(Authenticator.createConnectorConfig(authenticatorCredentials));
        SObject pushTopic = SalesforceStreamingSourceConfig.fetchPushTopicByName(partnerConnection, config.getPushTopicName());
        String query = pushTopic == null ? config.getQuery() : (String)pushTopic.getField("Query");
        return SalesforceSchemaUtil.getSchema(authenticatorCredentials, SObjectDescriptor.fromQuery(query));
    }

    private void recordLineage(StreamingSourceContext context, String outputName, Schema tableSchema, String operationName, String description) throws DatasetManagementException, TransactionFailureException {
        if (tableSchema == null) {
            LOG.warn("Schema for output %s is null. Field-level lineage will not be recorded", (Object)outputName);
            return;
        }
        if (tableSchema.getFields() == null) {
            LOG.warn("Schema fields for output %s is empty. Field-level lineage will not be recorded", (Object)outputName);
            return;
        }
        context.registerLineage(outputName, tableSchema);
        List fieldNames = tableSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList());
        if (!fieldNames.isEmpty()) {
            LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, outputName);
            lineageRecorder.recordRead(operationName, description, fieldNames);
        }
    }
}

