/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.publisher.source;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.TopicName;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.publisher.source.BackoffConfig;
import io.cdap.plugin.gcp.publisher.source.PubSubMessage;
import io.cdap.plugin.gcp.publisher.source.PubSubSubscriberConfig;
import io.cdap.plugin.gcp.publisher.source.PubSubSubscriberUtil;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubSubReceiver
extends Receiver<PubSubMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubReceiver.class);
    private static final String CREATE_SUBSCRIPTION_ERROR_MSG = "Failed to create subscription '%s'.";
    private static final String CREATE_SUBSCRIPTION_ADMIN_CLIENT_ERROR_MSG = "Failed to create subscription client to manage subscription '%s'.";
    private static final String CREATE_SUBSCRIPTION_RETRY_ERROR_MSG = "Failed to create subscription '%s' after 5 attempts";
    private static final String MISSING_TOPIC_ERROR_MSG = "Failed to create subscription. Topic '%s' was not found in project '%s'.";
    private static final String SUBSCRIBER_ERROR_MSG = "Failed to create subscriber using subscription '%s' for project '%s'.";
    private static final String FETCH_ERROR_MSG = "Failed to fetch new messages using subscription '%s' for project '%s'.";
    private static final String INTERRUPTED_EXCEPTION_MSG = "Interrupted Exception when sleeping during backoff.";
    private final PubSubSubscriberConfig config;
    private final boolean autoAcknowledge;
    private final BackoffConfig backoffConfig;
    private int previousFetchRate = -1;
    private transient String project;
    private transient String topic;
    private transient String subscription;
    private transient Credentials credentials;
    private transient ScheduledThreadPoolExecutor executor;
    private transient SubscriberStub subscriber;
    private transient AtomicInteger bucket;

    public PubSubReceiver(PubSubSubscriberConfig config, boolean autoAcknowledge, StorageLevel storageLevel) {
        this(config, autoAcknowledge, storageLevel, BackoffConfig.defaultInstance());
    }

    public PubSubReceiver(PubSubSubscriberConfig config, boolean autoAcknowledge, StorageLevel storageLevel, BackoffConfig backoffConfig) {
        super(storageLevel);
        this.config = config;
        this.autoAcknowledge = autoAcknowledge;
        this.backoffConfig = backoffConfig;
    }

    @VisibleForTesting
    public PubSubReceiver(String project, String topic, String subscription, Credentials credentials, boolean autoAcknowledge, StorageLevel storageLevel, BackoffConfig backoffConfig, ScheduledThreadPoolExecutor executor, SubscriberStub subscriber, AtomicInteger bucket) {
        super(storageLevel);
        this.backoffConfig = backoffConfig;
        this.project = project;
        this.topic = topic;
        this.subscription = subscription;
        this.autoAcknowledge = autoAcknowledge;
        this.credentials = credentials;
        this.executor = executor;
        this.subscriber = subscriber;
        this.bucket = bucket;
        this.config = null;
    }

    public void onStart() {
        this.executor = new ScheduledThreadPoolExecutor(3, new LoggingRejectedExecutionHandler());
        this.executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.executor.setRemoveOnCancelPolicy(true);
        this.bucket = new AtomicInteger();
        this.project = this.config.getProject();
        this.subscription = ProjectSubscriptionName.format((String)this.config.getProject(), (String)this.config.getSubscription());
        this.credentials = this.createCredentials();
        if (this.config.getTopic() != null) {
            this.topic = TopicName.format((String)this.config.getProject(), (String)this.config.getTopic());
            this.createSubscription();
        }
        this.subscriber = this.createSubscriberClient();
        this.scheduleTasks();
        LOG.info("Receiver started execution");
    }

    public void onStop() {
        if (this.executor != null && !this.executor.isShutdown()) {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOG.error("InterruptedException while waiting for executor to shutdown.");
            }
        }
        if (this.subscriber != null && !this.subscriber.isShutdown()) {
            this.subscriber.shutdown();
            try {
                this.subscriber.awaitTermination(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOG.error("InterruptedException while waiting for subscriber to shutdown.");
            }
        }
        LOG.info("Receiver completed execution");
    }

    @Nullable
    protected Credentials createCredentials() {
        if (this.isStopped()) {
            return null;
        }
        try {
            return this.config.getServiceAccount() == null ? null : GCPUtils.loadServiceAccountCredentials(this.config.getServiceAccount(), this.config.isServiceAccountFilePath());
        }
        catch (IOException e) {
            this.stop("Unable to get credentials for receiver.", e);
            return null;
        }
    }

    protected void createSubscription() {
        if (this.isStopped()) {
            return;
        }
        try (SubscriptionAdminClient subscriptionAdminClient = this.buildSubscriptionAdminClient();){
            PubSubSubscriberUtil.createSubscription(() -> !this.isStopped(), this.backoffConfig, this.subscription, this.topic, () -> subscriptionAdminClient, this::isApiExceptionRetryable);
        }
        catch (InterruptedException e) {
            this.stop(INTERRUPTED_EXCEPTION_MSG, e);
        }
        catch (IOException e) {
            this.stop(String.format(CREATE_SUBSCRIPTION_ADMIN_CLIENT_ERROR_MSG, this.subscription), e);
        }
        catch (ApiException e) {
            if (e.getStatusCode().getCode().equals((Object)StatusCode.Code.NOT_FOUND)) {
                String message = String.format(MISSING_TOPIC_ERROR_MSG, this.topic, this.project);
                this.stop(message, e);
                return;
            }
            this.stop(String.format(CREATE_SUBSCRIPTION_ERROR_MSG, this.subscription), e);
        }
        catch (RuntimeException e) {
            this.stop(String.format(CREATE_SUBSCRIPTION_RETRY_ERROR_MSG, this.subscription), e);
        }
    }

    @Nullable
    public SubscriberStub createSubscriberClient() {
        if (this.isStopped()) {
            return null;
        }
        try {
            return this.buildSubscriberClient();
        }
        catch (IOException ioe) {
            String message = String.format(SUBSCRIBER_ERROR_MSG, this.subscription, this.project);
            this.stop(message, ioe);
            return null;
        }
    }

    public void scheduleTasks() {
        if (!this.isStopped()) {
            this.executor.scheduleAtFixedRate(this::updateMessageRateAndFillBucket, 0L, 1L, TimeUnit.SECONDS);
            this.executor.scheduleWithFixedDelay(this::receiveMessages, 100L, 100L, TimeUnit.MILLISECONDS);
        }
    }

    protected void receiveMessages() {
        int backoff = this.backoffConfig.getInitialBackoffMs();
        while (!this.isStopped()) {
            try {
                this.fetchAndAck();
                return;
            }
            catch (ApiException ae) {
                if (this.isApiExceptionRetryable(ae)) {
                    backoff = this.sleepAndIncreaseBackoff(backoff);
                    continue;
                }
                String message = String.format(FETCH_ERROR_MSG, this.subscription, this.project);
                this.restart(message, ae);
                break;
            }
        }
    }

    protected void fetchAndAck() {
        int maxMessages = this.bucket.get();
        if (maxMessages <= 0) {
            return;
        }
        PullRequest pullRequest = PullRequest.newBuilder().setMaxMessages(maxMessages).setSubscription(this.subscription).build();
        PullResponse pullResponse = (PullResponse)this.subscriber.pullCallable().call((Object)pullRequest);
        List receivedMessages = pullResponse.getReceivedMessagesList();
        if (receivedMessages.isEmpty()) {
            return;
        }
        this.bucket.updateAndGet(x -> x - receivedMessages.size());
        if (this.isStopped()) {
            LOG.trace("Receiver stopped before store and ack.");
            return;
        }
        List messages = receivedMessages.stream().map(PubSubMessage::new).collect(Collectors.toList());
        this.store(messages.iterator());
        if (this.autoAcknowledge) {
            List ackIds = messages.stream().map(PubSubMessage::getAckId).collect(Collectors.toList());
            AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().setSubscription(this.subscription).addAllAckIds(ackIds).build();
            this.subscriber.acknowledgeCallable().call((Object)acknowledgeRequest);
        }
    }

    protected SubscriptionAdminSettings buildSubscriptionAdminSettings() throws IOException {
        SubscriptionAdminSettings.Builder builder = SubscriptionAdminSettings.newBuilder();
        if (this.credentials != null) {
            builder.setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.credentials));
        }
        return builder.build();
    }

    protected SubscriberStubSettings buildSubscriberSettings() throws IOException {
        SubscriberStubSettings.Builder builder = SubscriberStubSettings.newBuilder();
        if (this.credentials != null) {
            builder.setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.credentials));
        }
        return builder.build();
    }

    protected SubscriptionAdminClient buildSubscriptionAdminClient() {
        try {
            return SubscriptionAdminClient.create((SubscriptionAdminSettings)this.buildSubscriptionAdminSettings());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected SubscriberStub buildSubscriberClient() throws IOException {
        return GrpcSubscriberStub.create((SubscriberStubSettings)this.buildSubscriberSettings());
    }

    protected int sleepAndIncreaseBackoff(int backoff) {
        try {
            if (!this.isStopped()) {
                LOG.trace("Backoff - Sleeping for {} ms.", (Object)backoff);
                Thread.sleep(backoff);
            }
        }
        catch (InterruptedException e) {
            this.stop(INTERRUPTED_EXCEPTION_MSG, e);
        }
        return this.calculateUpdatedBackoff(backoff);
    }

    protected int calculateUpdatedBackoff(int backoff) {
        return Math.min((int)((double)backoff * this.backoffConfig.getBackoffFactor()), this.backoffConfig.getMaximumBackoffMs());
    }

    protected void updateMessageRateAndFillBucket() {
        int messageRate = (int)Math.min(this.supervisor().getCurrentRateLimit(), Integer.MAX_VALUE);
        if (messageRate != this.previousFetchRate) {
            this.previousFetchRate = messageRate;
            LOG.trace("Receiver fetch rate is set to: {}", (Object)messageRate);
        }
        this.bucket.set(messageRate);
    }

    protected boolean isApiExceptionRetryable(ApiException ae) {
        return PubSubSubscriberUtil.isApiExceptionRetryable(ae);
    }

    protected static class LoggingRejectedExecutionHandler
    implements RejectedExecutionHandler {
        protected LoggingRejectedExecutionHandler() {
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            LOG.error("Thread Pool rejected execution of a task.");
        }
    }

    public static class BackoffConfigBuilder
    implements Serializable {
        public int initialBackoffMs = 100;
        public int maximumBackoffMs = 10000;
        public double backoffFactor = 2.0;

        protected BackoffConfigBuilder() {
        }

        public static BackoffConfigBuilder getInstance() {
            return new BackoffConfigBuilder();
        }

        public BackoffConfig build() {
            if (this.initialBackoffMs > this.maximumBackoffMs) {
                throw new IllegalArgumentException("Maximum backoff cannot be smaller than Initial backoff");
            }
            return new BackoffConfig(this.initialBackoffMs, this.maximumBackoffMs, this.backoffFactor);
        }

        public int getInitialBackoffMs() {
            return this.initialBackoffMs;
        }

        public BackoffConfigBuilder setInitialBackoffMs(int initialBackoffMs) {
            this.initialBackoffMs = initialBackoffMs;
            return this;
        }

        public int getMaximumBackoffMs() {
            return this.maximumBackoffMs;
        }

        public BackoffConfigBuilder setMaximumBackoffMs(int maximumBackoffMs) {
            this.maximumBackoffMs = maximumBackoffMs;
            return this;
        }

        public double getBackoffFactor() {
            return this.backoffFactor;
        }

        public BackoffConfigBuilder setBackoffFactor(int backoffFactor) {
            this.backoffFactor = backoffFactor;
            return this;
        }
    }
}

