/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.salesforce.plugin.source.batch;

import com.sforce.async.BulkConnection;
import com.sforce.ws.ConnectionException;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.action.SettableArguments;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.MapToRecordTransformer;
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceInputFormatProvider;
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceMultiSourceConfig;
import io.cdap.plugin.salesforce.plugin.source.batch.SalesforceSplit;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

@Plugin(type="batchsource")
@Name(value="SalesforceMultiObjects")
@Description(value="Reads multiple SObjects in Salesforce. Outputs one record for each row in each SObject, with the SObject name as a record field. Also sets a pipeline argument for each SObject read, which contains its schema.")
public class SalesforceBatchMultiSource
extends BatchSource<Schema, Map<String, String>, StructuredRecord> {
    public static final String NAME = "SalesforceMultiObjects";
    private static final String MULTI_SINK_PREFIX = "multisink.";
    private final SalesforceMultiSourceConfig config;
    private MapToRecordTransformer transformer;
    private Set<String> jobIds = new HashSet<String>();
    private AuthenticatorCredentials authenticatorCredentials;

    public SalesforceBatchMultiSource(SalesforceMultiSourceConfig config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        this.config.validate(stageConfigurer.getFailureCollector());
        if (!this.config.canAttemptToEstablishConnection()) {
            return;
        }
        this.config.validateSObjects(stageConfigurer.getFailureCollector());
        stageConfigurer.setOutputSchema(null);
    }

    public void prepareRun(BatchSourceContext context) throws ConnectionException {
        FailureCollector collector = context.getFailureCollector();
        this.config.validate(collector);
        this.config.validateSObjects(collector);
        collector.getOrThrowException();
        List<String> queries = this.config.getQueries(context.getLogicalStartTime());
        Map<String, Schema> schemas = this.config.getSObjectsSchemas(queries);
        SettableArguments arguments = context.getArguments();
        schemas.forEach((sObjectName, sObjectSchema) -> arguments.set(MULTI_SINK_PREFIX + sObjectName, sObjectSchema.toString()));
        String sObjectNameField = this.config.getSObjectNameField();
        this.authenticatorCredentials = this.config.getAuthenticatorCredentials();
        BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(this.authenticatorCredentials);
        List<SalesforceSplit> querySplits = queries.parallelStream().map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, this.config.getOperation())).flatMap(Collection::stream).collect(Collectors.toList());
        querySplits.parallelStream().forEach(salesforceSplit -> this.jobIds.add(salesforceSplit.getJobId()));
        context.setInput(Input.of((String)this.config.referenceName, (InputFormatProvider)new SalesforceInputFormatProvider(this.config, this.getSchemaWithNameField(sObjectNameField, schemas), querySplits, sObjectNameField)));
    }

    public void onRunFinish(boolean succeeded, BatchSourceContext context) {
        super.onRunFinish(succeeded, (BatchContext)context);
        SalesforceSplitUtil.closeJobs(this.jobIds, this.authenticatorCredentials);
    }

    public void initialize(BatchRuntimeContext context) throws Exception {
        super.initialize(context);
        this.transformer = new MapToRecordTransformer();
    }

    public void transform(KeyValue<Schema, Map<String, String>> input, Emitter<StructuredRecord> emitter) throws Exception {
        StructuredRecord record = this.transformer.transform((Schema)input.getKey(), (Map)input.getValue());
        emitter.emit((Object)record);
    }

    private Map<String, String> getSchemaWithNameField(String sObjectNameField, Map<String, Schema> schemas) {
        return schemas.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> this.getSchemaString(sObjectNameField, (Schema)entry.getValue()), (o, n) -> n));
    }

    private String getSchemaString(String sObjectNameField, Schema schema) {
        if (schema.getType() != Schema.Type.RECORD || schema.getFields() == null) {
            throw new IllegalArgumentException(String.format("Invalid schema '%s'", schema));
        }
        ArrayList<Schema.Field> fields = new ArrayList<Schema.Field>(schema.getFields());
        fields.add(Schema.Field.of((String)sObjectNameField, (Schema)Schema.of((Schema.Type)Schema.Type.STRING)));
        return Schema.recordOf((String)Objects.requireNonNull(schema.getRecordName()), fields).toString();
    }
}

