/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers.gcs;

import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.sources.helpers.gcs.PubsubQueueClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubMessagesFetcher {
    private static final int DEFAULT_BATCH_SIZE_ACK_API = 10;
    private static final long MAX_WAIT_TIME_TO_ACK_MESSAGES = TimeUnit.MINUTES.toMillis(1L);
    private static final int ACK_PRODUCER_THREAD_POOL_SIZE = 3;
    private final ExecutorService threadPool = Executors.newFixedThreadPool(3);
    private final String googleProjectId;
    private final String pubsubSubscriptionId;
    private final int batchSize;
    private final int maxMessagesPerSync;
    private final long maxFetchTimePerSyncSecs;
    private final SubscriberStubSettings subscriberStubSettings;
    private final PubsubQueueClient pubsubQueueClient;
    private static final Logger LOG = LoggerFactory.getLogger(PubsubMessagesFetcher.class);

    public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId, int batchSize, int maxMessagesPerSync, long maxFetchTimePerSyncSecs, PubsubQueueClient pubsubQueueClient) {
        this.googleProjectId = googleProjectId;
        this.pubsubSubscriptionId = pubsubSubscriptionId;
        this.batchSize = batchSize;
        this.maxMessagesPerSync = maxMessagesPerSync;
        this.maxFetchTimePerSyncSecs = maxFetchTimePerSyncSecs;
        try {
            this.subscriberStubSettings = ((SubscriberStubSettings.Builder)SubscriberStubSettings.newBuilder().setTransportChannelProvider((TransportChannelProvider)SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setMaxInboundMessageSize(Integer.valueOf(0x1400000)).build())).build();
        }
        catch (IOException e) {
            throw new HoodieException("Error creating subscriber stub settings", (Throwable)e);
        }
        this.pubsubQueueClient = pubsubQueueClient;
    }

    public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId, int batchSize, int maxMessagesPerSync, long maxFetchTimePerSyncSecs) {
        this(googleProjectId, pubsubSubscriptionId, batchSize, maxMessagesPerSync, maxFetchTimePerSyncSecs, new PubsubQueueClient());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<ReceivedMessage> fetchMessages() {
        ArrayList<ReceivedMessage> messageList = new ArrayList<ReceivedMessage>();
        try (SubscriberStub subscriber = this.pubsubQueueClient.getSubscriber(this.subscriberStubSettings);){
            String subscriptionName = ProjectSubscriptionName.format((String)this.googleProjectId, (String)this.pubsubSubscriptionId);
            long startTime = System.currentTimeMillis();
            long unAckedMessages = this.pubsubQueueClient.getNumUnAckedMessages(this.pubsubSubscriptionId);
            LOG.info("Found unacked messages " + unAckedMessages);
            while ((long)messageList.size() < unAckedMessages && messageList.size() < this.maxMessagesPerSync && System.currentTimeMillis() - startTime < this.maxFetchTimePerSyncSecs * 1000L) {
                PullResponse pullResponse = this.pubsubQueueClient.makePullRequest(subscriber, subscriptionName, this.batchSize);
                messageList.addAll(pullResponse.getReceivedMessagesList());
            }
            ArrayList<ReceivedMessage> arrayList = messageList;
            return arrayList;
        }
        catch (Exception e) {
            throw new HoodieException("Error when fetching metadata", (Throwable)e);
        }
    }

    public void sendAcks(List<String> messagesToAck) throws IOException {
        try (SubscriberStub subscriber = this.pubsubQueueClient.getSubscriber(this.subscriberStubSettings);){
            int numberOfBatches = (int)Math.ceil((double)messagesToAck.size() / 10.0);
            CompletableFuture.allOf((CompletableFuture[])IntStream.range(0, numberOfBatches).parallel().boxed().map(batchIndex -> this.getTask(subscriber, messagesToAck, (int)batchIndex)).toArray(CompletableFuture[]::new)).get(MAX_WAIT_TIME_TO_ACK_MESSAGES, TimeUnit.MILLISECONDS);
            LOG.debug("Flushed out all outstanding acknowledged messages: " + messagesToAck.size());
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new IOException("Failed to ack messages from PubSub", e);
        }
    }

    private CompletableFuture<Void> getTask(SubscriberStub subscriber, List<String> messagesToAck, int batchIndex) {
        String subscriptionName = ProjectSubscriptionName.format((String)this.googleProjectId, (String)this.pubsubSubscriptionId);
        List<String> messages = messagesToAck.subList(batchIndex, Math.min(batchIndex + 10, messagesToAck.size()));
        return CompletableFuture.runAsync(() -> this.pubsubQueueClient.makeAckRequest(subscriber, subscriptionName, messages), this.threadPool);
    }
}

