/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.publisher.source;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import io.cdap.plugin.gcp.publisher.source.PubSubMessage;
import io.cdap.plugin.gcp.publisher.source.PubSubSubscriberConfig;
import io.cdap.plugin.gcp.publisher.source.PubSubSubscriberUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;

public class PubSubRDDIterator
implements Iterator<PubSubMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubRDDIterator.class);
    private static final int MAX_MESSAGES = 1000;
    private static final int MAX_MESSAGE_SIZE = 0x1400000;
    private static final int RETRY_DELAY = 100;
    private final long startTime;
    private final PubSubSubscriberConfig config;
    private final TaskContext context;
    private final long batchDuration;
    private final String subscriptionFormatted;
    private final boolean autoAcknowledge;
    private final Queue<ReceivedMessage> receivedMessages;
    private PullRequest pullRequest;
    private SubscriberStub subscriber;
    private long messageCount;

    public PubSubRDDIterator(PubSubSubscriberConfig config, TaskContext context, Time batchTime, long batchDuration, boolean autoAcknowledge) {
        this.config = config;
        this.context = context;
        this.batchDuration = batchDuration;
        this.startTime = batchTime.milliseconds();
        this.autoAcknowledge = autoAcknowledge;
        this.subscriptionFormatted = ProjectSubscriptionName.format((String)this.config.getProject(), (String)this.config.getSubscription());
        this.receivedMessages = new ConcurrentLinkedDeque<ReceivedMessage>();
    }

    public boolean hasNext() {
        long expiryTimeMillis;
        if (!this.receivedMessages.isEmpty()) {
            return true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis >= (expiryTimeMillis = this.startTime + this.batchDuration)) {
            LOG.debug("Time exceeded for batch. Total time is {} millis. Total messages returned is {} .", (Object)(currentTimeMillis - this.startTime), (Object)this.messageCount);
            return false;
        }
        try {
            List<ReceivedMessage> messages = this.fetchAndAck(expiryTimeMillis);
            if (messages.isEmpty()) {
                LOG.debug("No more messages. Total messages returned is {} .", (Object)this.messageCount);
                return false;
            }
            this.receivedMessages.addAll(messages);
            return true;
        }
        catch (IOException e) {
            throw new RuntimeException("Error reading messages from Pub/Sub. ", e);
        }
    }

    public PubSubMessage next() {
        if (this.receivedMessages.isEmpty()) {
            throw new IllegalStateException("Unexpected state. No messages available.");
        }
        ReceivedMessage currentMessage = this.receivedMessages.poll();
        ++this.messageCount;
        return new PubSubMessage(currentMessage);
    }

    private SubscriberStub buildSubscriberClient() throws IOException {
        SubscriberStubSettings.Builder builder = SubscriberStubSettings.newBuilder();
        Credentials credentials = PubSubSubscriberUtil.createCredentials(this.config.getServiceAccount(), this.config.isServiceAccountFilePath());
        if (credentials != null) {
            builder.setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)credentials));
        }
        builder.setTransportChannelProvider((TransportChannelProvider)SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setMaxInboundMessageSize(Integer.valueOf(0x1400000)).build());
        builder.getSubscriptionSettings().setRetrySettings(PubSubSubscriberUtil.getRetrySettings());
        return GrpcSubscriberStub.create((SubscriberStubSettings)builder.build());
    }

    private List<ReceivedMessage> fetchAndAck(long expiryTimeMillis) throws IOException {
        if (this.subscriber == null) {
            this.subscriber = this.buildSubscriberClient();
            this.context.addTaskCompletionListener(context1 -> {
                if (this.subscriber != null && !this.subscriber.isShutdown()) {
                    this.subscriber.shutdown();
                    try {
                        this.subscriber.awaitTermination(30L, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        LOG.warn("Exception in shutting down subscriber. ", (Throwable)e);
                    }
                }
            });
            this.pullRequest = PullRequest.newBuilder().setMaxMessages(1000).setSubscription(this.subscriptionFormatted).build();
        }
        while (System.currentTimeMillis() < expiryTimeMillis) {
            PullResponse pullResponse = (PullResponse)this.subscriber.pullCallable().call((Object)this.pullRequest);
            List receivedMessagesList = pullResponse.getReceivedMessagesList();
            if (receivedMessagesList.isEmpty()) {
                try {
                    TimeUnit.MILLISECONDS.sleep(100L);
                    continue;
                }
                catch (InterruptedException e) {
                    LOG.debug("Interrupted while waiting for retry. ", (Throwable)e);
                    return Collections.EMPTY_LIST;
                }
            }
            List<String> ackIds = receivedMessagesList.stream().map(ReceivedMessage::getAckId).collect(Collectors.toList());
            this.ackMessages(ackIds, this.autoAcknowledge, this.subscriptionFormatted);
            return receivedMessagesList;
        }
        return Collections.EMPTY_LIST;
    }

    private void ackMessages(List<String> ackIds, boolean autoAcknowledge, String subscriptionFormatted) {
        if (!autoAcknowledge) {
            return;
        }
        AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().setSubscription(subscriptionFormatted).addAllAckIds(ackIds).build();
        this.subscriber.acknowledgeCallable().call((Object)acknowledgeRequest);
    }
}

