/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.servicenow.source;

import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.action.SettableArguments;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.servicenow.source.ServiceNowInputFormat;
import io.cdap.plugin.servicenow.source.ServiceNowSourceConfig;
import io.cdap.plugin.servicenow.source.util.ServiceNowTableInfo;
import io.cdap.plugin.servicenow.source.util.SourceQueryMode;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsource")
@Name(value="ServiceNow")
@Description(value="Reads from multiple tables in ServiceNow. Outputs one record for each row in each table, with the table name as a record field. Also sets a pipeline argument for each table read, which contains the table schema. ")
public class ServiceNowSource
extends BatchSource<NullWritable, StructuredRecord, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceNowSource.class);
    private final ServiceNowSourceConfig conf;

    public ServiceNowSource(ServiceNowSourceConfig conf) {
        this.conf = conf;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        LOG.debug("Validate config during `configurePipeline` stage: {}", (Object)this.conf);
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector collector = stageConfigurer.getFailureCollector();
        this.conf.validate(collector);
        collector.getOrThrowException();
        if (this.conf.shouldGetSchema() && this.conf.getQueryMode() == SourceQueryMode.TABLE) {
            List<ServiceNowTableInfo> tableInfo = ServiceNowInputFormat.fetchTableInfo(this.conf.getQueryMode(collector), this.conf);
            stageConfigurer.setOutputSchema(((ServiceNowTableInfo)tableInfo.stream().findFirst().get()).getSchema());
        } else if (this.conf.getQueryMode() == SourceQueryMode.REPORTING) {
            stageConfigurer.setOutputSchema(null);
        }
    }

    public void prepareRun(BatchSourceContext context) {
        FailureCollector collector = context.getFailureCollector();
        this.conf.validate(collector);
        collector.getOrThrowException();
        SourceQueryMode mode = this.conf.getQueryMode(collector);
        Configuration hConf = new Configuration();
        List<ServiceNowTableInfo> tables = ServiceNowInputFormat.setInput(hConf, mode, this.conf);
        SettableArguments arguments = context.getArguments();
        for (ServiceNowTableInfo tableInfo : tables) {
            arguments.set("multisink." + tableInfo.getTableName(), tableInfo.getSchema().toString());
            this.recordLineage(context, tableInfo);
        }
        context.setInput(Input.of((String)this.conf.getReferenceName(), (InputFormatProvider)new SourceInputFormatProvider(ServiceNowInputFormat.class, hConf)));
    }

    public void transform(KeyValue<NullWritable, StructuredRecord> input, Emitter<StructuredRecord> emitter) {
        emitter.emit(input.getValue());
    }

    private void recordLineage(BatchSourceContext context, ServiceNowTableInfo tableInfo) {
        String tableName = tableInfo.getTableName();
        String outputName = String.format("%s-%s", this.conf.getReferenceName(), tableName);
        Schema schema = tableInfo.getSchema();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, outputName);
        lineageRecorder.createExternalDataset(schema);
        List fields = Objects.requireNonNull(schema).getFields();
        if (fields != null && !fields.isEmpty()) {
            lineageRecorder.recordRead("Read", String.format("Read from '%s' ServiceNow table.", tableName), fields.stream().map(Schema.Field::getName).collect(Collectors.toList()));
        }
    }
}

