/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.publisher.source;

import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.streaming.StreamingSourceContext;
import io.cdap.cdap.etl.api.streaming.StreamingStateHandler;
import io.cdap.cdap.features.Feature;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.publisher.source.GoogleSubscriberConfig;
import io.cdap.plugin.gcp.publisher.source.PubSubMessage;
import io.cdap.plugin.gcp.publisher.source.PubSubSubscriber;
import io.cdap.plugin.gcp.publisher.source.PubSubSubscriberUtil;
import io.cdap.plugin.gcp.publisher.source.SerializableFunction;
import java.util.HashMap;
import java.util.stream.Collectors;

@Plugin(type="streamingsource")
@Name(value="GoogleSubscriber")
@Description(value="Streaming Source to read messages from Google PubSub.")
public class GoogleSubscriber
extends PubSubSubscriber<StructuredRecord>
implements StreamingStateHandler {
    private GoogleSubscriberConfig config;

    public GoogleSubscriber(GoogleSubscriberConfig config) {
        super(config);
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        this.config.validate(collector);
        pipelineConfigurer.getStageConfigurer().setOutputSchema(this.config.getSchema());
        if (!pipelineConfigurer.isFeatureEnabled(Feature.STREAMING_PIPELINE_NATIVE_STATE_TRACKING.getFeatureFlagString())) {
            return;
        }
        HashMap<String, String> additionalProps = new HashMap<String, String>();
        additionalProps.put("spark.task.maxFailures", "1");
        additionalProps.put("spark.stage.maxConsecutiveAttempts", "1");
        pipelineConfigurer.setPipelineProperties(additionalProps);
    }

    public void prepareRun(StreamingSourceContext context) throws Exception {
        FailureCollector collector = context.getFailureCollector();
        this.config.validate(collector);
        Schema schema = context.getOutputSchema();
        context.registerLineage(this.config.referenceName, schema);
        if (schema.getFields() != null) {
            LineageRecorder recorder = new LineageRecorder((BatchContext)context, this.config.referenceName);
            recorder.recordRead("Read", "Read from Pub/Sub", schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
        }
    }

    @Override
    public SerializableFunction<PubSubMessage, StructuredRecord> getMappingFunction() {
        return PubSubSubscriberUtil.getMappingFunction(this.config);
    }
}

