/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.publisher.source;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.NotFoundException;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.CreateSnapshotRequest;
import com.google.pubsub.v1.ProjectSnapshotName;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.SeekRequest;
import com.google.pubsub.v1.Snapshot;
import com.google.pubsub.v1.TopicName;
import io.cdap.cdap.etl.api.streaming.StreamingEventHandler;
import io.cdap.plugin.gcp.publisher.source.BackoffConfig;
import io.cdap.plugin.gcp.publisher.source.PubSubMessage;
import io.cdap.plugin.gcp.publisher.source.PubSubRDD;
import io.cdap.plugin.gcp.publisher.source.PubSubSubscriberConfig;
import io.cdap.plugin.gcp.publisher.source.PubSubSubscriberUtil;
import io.cdap.plugin.gcp.publisher.source.SerializableFunction;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.reflect.ClassTag$;

public class PubSubDirectDStream<T>
extends InputDStream<T>
implements StreamingEventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubDirectDStream.class);
    private static final String CDAP_PIPELINE = "cdap_pipeline";
    private static final int MAX_SNAPSHOT_ATTEMPTS = 3;
    private final Credentials credentials;
    private final PubSubSubscriberConfig config;
    private final long readDuration;
    private final io.cdap.cdap.etl.api.streaming.StreamingContext context;
    private final boolean autoAcknowledge;
    private final SerializableFunction<PubSubMessage, T> mappingFn;
    private final StreamingContext streamingContext;
    private final String pipeline;
    private final BackoffConfig backoffConfig;
    private SubscriptionAdminClient subscriptionAdminClient;
    private ProjectSnapshotName currentSnapshotName;
    private boolean takeSnapshot;

    public PubSubDirectDStream(io.cdap.cdap.etl.api.streaming.StreamingContext context, PubSubSubscriberConfig config, long readDuration, boolean autoAcknowledge, SerializableFunction<PubSubMessage, T> mappingFn) {
        super(context.getSparkStreamingContext().ssc(), ClassTag$.MODULE$.apply(PubSubMessage.class));
        this.streamingContext = context.getSparkStreamingContext().ssc();
        this.config = config;
        this.readDuration = readDuration;
        this.context = context;
        this.autoAcknowledge = autoAcknowledge;
        this.mappingFn = mappingFn;
        this.pipeline = context.getPipelineName();
        this.credentials = PubSubSubscriberUtil.createCredentials(config.getServiceAccount(), config.isServiceAccountFilePath());
        this.backoffConfig = BackoffConfig.defaultInstance();
    }

    public Option<RDD<T>> compute(Time validTime) {
        LOG.debug("Computing RDD for time {}.", (Object)validTime);
        PubSubRDD pubSubRDD = new PubSubRDD(this.streamingContext.sparkContext(), validTime, this.readDuration, this.config, this.autoAcknowledge);
        RDD mapped = pubSubRDD.map(this.mappingFn, ClassTag$.MODULE$.apply(PubSubMessage.class));
        return Option.apply((Object)mapped);
    }

    public void start() {
        try {
            this.subscriptionAdminClient = this.buildSubscriptionAdminClient(this.credentials);
        }
        catch (IOException e) {
            throw new RuntimeException("SubscriptionAdminClient creation failed.", e);
        }
        if (this.config.getTopic() != null) {
            try {
                this.createSubscriptionIfNotPresent();
            }
            catch (IOException | InterruptedException e) {
                throw new RuntimeException("Subscription creation failed.", e);
            }
        }
        this.currentSnapshotName = this.fetchSnapShot(this.config.getSubscription(), this.context);
        if (this.currentSnapshotName != null) {
            this.seekSnapshot(this.currentSnapshotName, ProjectSubscriptionName.of((String)this.config.getProject(), (String)this.config.getSubscription()));
            this.takeSnapshot = false;
        } else {
            this.takeSnapshot = true;
        }
    }

    public void stop() {
        if (this.subscriptionAdminClient == null) {
            return;
        }
        this.subscriptionAdminClient.close();
    }

    public void onBatchStarted(io.cdap.cdap.etl.api.streaming.StreamingContext streamingContext) {
        LOG.debug("Starting a batch.");
        if (!this.takeSnapshot) {
            return;
        }
        String generatedSnapshotName = this.generateName(this.config.getSubscription());
        ProjectSnapshotName projectSnapshotName = ProjectSnapshotName.of((String)this.config.getProject(), (String)generatedSnapshotName);
        ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of((String)this.config.getProject(), (String)this.config.getSubscription());
        Snapshot snapshot = this.createSnapshot(projectSnapshotName, projectSubscriptionName);
        try {
            this.saveSnapshotAsState(snapshot, this.config.getSubscription(), this.context);
        }
        catch (IOException e) {
            this.deleteSnapshot(projectSnapshotName);
            throw new RuntimeException("Error while saving state.", e);
        }
        this.currentSnapshotName = projectSnapshotName;
    }

    public void onBatchRetry(io.cdap.cdap.etl.api.streaming.StreamingContext streamingContext) {
        LOG.debug("Batch is about to be retried. Seeking to snapshot {} for the current batch.", (Object)this.currentSnapshotName);
        this.seekSnapshot(this.currentSnapshotName, ProjectSubscriptionName.of((String)this.config.getProject(), (String)this.config.getSubscription()));
    }

    public void onBatchCompleted(io.cdap.cdap.etl.api.streaming.StreamingContext streamingContext) {
        LOG.debug("Batch completed called.");
        try {
            streamingContext.deleteState(this.config.getSubscription());
        }
        catch (IOException e) {
            throw new RuntimeException("Deleting state failed. ", e);
        }
        this.deleteSnapshot(this.currentSnapshotName);
        this.takeSnapshot = true;
        LOG.debug("Batch completed successfully. Deleted snapshot {} for the current batch.", (Object)this.currentSnapshotName);
    }

    private String generateName(String subscription) {
        String uuidString = UUID.randomUUID().toString();
        int maxLenForSubscriptionPrefix = 255 - uuidString.length() - 1;
        String subscriptionPrefix = subscription.length() > maxLenForSubscriptionPrefix ? subscription.substring(0, maxLenForSubscriptionPrefix) : subscription;
        return String.format("%s-%s", subscriptionPrefix, uuidString);
    }

    private void createSubscriptionIfNotPresent() throws IOException, InterruptedException {
        PubSubSubscriberUtil.createSubscription(() -> true, this.backoffConfig, ProjectSubscriptionName.format((String)this.config.getProject(), (String)this.config.getSubscription()), TopicName.format((String)this.config.getProject(), (String)this.config.getTopic()), () -> this.subscriptionAdminClient, PubSubSubscriberUtil::isApiExceptionRetryable);
    }

    private void seekSnapshot(ProjectSnapshotName projectSnapshotName, ProjectSubscriptionName projectSubscriptionName) {
        try {
            this.subscriptionAdminClient.seek(SeekRequest.newBuilder().setSnapshot(projectSnapshotName.toString()).setSubscription(projectSubscriptionName.toString()).build());
        }
        catch (NotFoundException e) {
            throw new RuntimeException(String.format("Saved snapshot %s not found. Please clear the application state to proceed. REST api for state deletion is namespaces/{namespace-id}/apps/{app-id}/state .", projectSnapshotName.toString()), e);
        }
    }

    @Nullable
    private ProjectSnapshotName fetchSnapShot(String subscriptionId, io.cdap.cdap.etl.api.streaming.StreamingContext context) {
        Optional state = null;
        try {
            state = context.getState(subscriptionId);
        }
        catch (IOException e) {
            throw new RuntimeException(String.format("Error fetching saved state for subscription %s.", subscriptionId), e);
        }
        if (!state.isPresent()) {
            LOG.debug("No saved state for {}.", (Object)subscriptionId);
            return null;
        }
        try {
            Snapshot snapshot = Snapshot.parseFrom((byte[])((byte[])state.get()));
            LOG.debug("Found existing snapshot {} .", (Object)snapshot.getName());
            return ProjectSnapshotName.parse((String)snapshot.getName());
        }
        catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(String.format("Error parsing saved state for subscription %s.", subscriptionId), e);
        }
    }

    private Snapshot createSnapshot(ProjectSnapshotName projectSnapshotName, ProjectSubscriptionName projectSubscriptionName) {
        LOG.debug("Creating snapshot {} for subscription {} in Pub/Sub .", (Object)projectSnapshotName.toString(), (Object)projectSubscriptionName.toString());
        try {
            return PubSubSubscriberUtil.callWithRetry(() -> {
                CreateSnapshotRequest request = CreateSnapshotRequest.newBuilder().setName(projectSnapshotName.toString()).setSubscription(projectSubscriptionName.toString()).putAllLabels(Collections.singletonMap(CDAP_PIPELINE, PubSubDirectDStream.getLabelValue(this.pipeline))).build();
                return this.subscriptionAdminClient.createSnapshot(request);
            }, this.backoffConfig, 3);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    static String getLabelValue(String pipeline) {
        String labelValue = pipeline.toLowerCase();
        if (labelValue.length() > 63) {
            labelValue = labelValue.substring(0, 63);
            LOG.debug("Trimming pipeline name to 63 chars to add as label for snapshot.");
        }
        return labelValue;
    }

    private void deleteSnapshot(ProjectSnapshotName projectSnapshotName) {
        try {
            PubSubSubscriberUtil.callWithRetry(() -> {
                this.subscriptionAdminClient.deleteSnapshot(projectSnapshotName);
                return null;
            }, this.backoffConfig, 3);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void saveSnapshotAsState(Snapshot snapshot, String subscription, io.cdap.cdap.etl.api.streaming.StreamingContext context) throws IOException {
        LOG.debug("Saving snapshot {} in state .", (Object)snapshot.getName());
        context.saveState(subscription, snapshot.toByteArray());
    }

    private SubscriptionAdminClient buildSubscriptionAdminClient(Credentials credentials) throws IOException {
        SubscriptionAdminSettings.Builder builder = SubscriptionAdminSettings.newBuilder();
        if (credentials != null) {
            builder.setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)credentials));
        }
        RetrySettings retrySettings = PubSubSubscriberUtil.getRetrySettings();
        builder.seekSettings().setRetrySettings(retrySettings);
        builder.createSnapshotSettings().setRetrySettings(retrySettings);
        builder.deleteSnapshotSettings().setRetrySettings(retrySettings);
        return SubscriptionAdminClient.create((SubscriptionAdminSettings)builder.build());
    }
}

