/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers.gcs;

import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
import org.apache.hudi.utilities.sources.helpers.gcs.PubsubQueueClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestPubsubMessagesFetcher {
    private static final String PROJECT_ID = "test-project";
    private static final String SUBSCRIPTION_ID = "test-subscription";
    private static final String SUBSCRIPTION_NAME = ProjectSubscriptionName.format((String)"test-project", (String)"test-subscription");
    private static final int SMALL_BATCH_SIZE = 1;
    private static final int MAX_MESSAGES_IN_REQUEST = 1000;
    private static final long MAX_WAIT_TIME_IN_REQUEST = TimeUnit.SECONDS.toMillis(1L);
    private final SubscriberStub mockSubscriber = (SubscriberStub)Mockito.mock(SubscriberStub.class);
    private final PubsubQueueClient mockPubsubQueueClient = (PubsubQueueClient)Mockito.mock(PubsubQueueClient.class);

    @Test
    public void testFetchMessages() throws IOException {
        ((SubscriberStub)Mockito.doNothing().when((Object)this.mockSubscriber)).close();
        Mockito.when((Object)this.mockPubsubQueueClient.getSubscriber((SubscriberStubSettings)ArgumentMatchers.any())).thenReturn((Object)this.mockSubscriber);
        Mockito.when((Object)this.mockPubsubQueueClient.getNumUnAckedMessages(SUBSCRIPTION_ID)).thenReturn((Object)3L);
        ((SubscriberStub)Mockito.doNothing().when((Object)this.mockSubscriber)).close();
        ReceivedMessage message1 = ReceivedMessage.newBuilder().setAckId("1").setMessage(PubsubMessage.newBuilder().setMessageId("msgId1").build()).build();
        ReceivedMessage message2 = ReceivedMessage.newBuilder().setAckId("2").setMessage(PubsubMessage.newBuilder().setMessageId("msgId2").build()).build();
        ReceivedMessage message3 = ReceivedMessage.newBuilder().setAckId("3").setMessage(PubsubMessage.newBuilder().setMessageId("msgId3").build()).build();
        Mockito.when((Object)this.mockPubsubQueueClient.makePullRequest(this.mockSubscriber, SUBSCRIPTION_NAME, 1)).thenReturn((Object)PullResponse.newBuilder().addReceivedMessages(message1).build()).thenReturn((Object)PullResponse.newBuilder().addReceivedMessages(message2).build()).thenReturn((Object)PullResponse.newBuilder().addReceivedMessages(message3).build());
        PubsubMessagesFetcher fetcher = new PubsubMessagesFetcher(PROJECT_ID, SUBSCRIPTION_ID, 1, 1000, MAX_WAIT_TIME_IN_REQUEST, this.mockPubsubQueueClient);
        List messages = fetcher.fetchMessages();
        Assertions.assertEquals((int)3, (int)messages.size());
        Assertions.assertEquals((Object)"1", (Object)((ReceivedMessage)messages.get(0)).getAckId());
        Assertions.assertEquals((Object)"2", (Object)((ReceivedMessage)messages.get(1)).getAckId());
        Assertions.assertEquals((Object)"3", (Object)((ReceivedMessage)messages.get(2)).getAckId());
        ((PubsubQueueClient)Mockito.verify((Object)this.mockPubsubQueueClient, (VerificationMode)Mockito.times((int)3))).makePullRequest(this.mockSubscriber, SUBSCRIPTION_NAME, 1);
    }

    @Test
    public void testFetchMessagesZeroTimeout() throws IOException {
        ((SubscriberStub)Mockito.doNothing().when((Object)this.mockSubscriber)).close();
        Mockito.when((Object)this.mockPubsubQueueClient.getSubscriber((SubscriberStubSettings)ArgumentMatchers.any())).thenReturn((Object)this.mockSubscriber);
        Mockito.when((Object)this.mockPubsubQueueClient.getNumUnAckedMessages(SUBSCRIPTION_ID)).thenReturn((Object)100L);
        PubsubMessagesFetcher fetcher = new PubsubMessagesFetcher(PROJECT_ID, SUBSCRIPTION_ID, 1, 1000, 0L, this.mockPubsubQueueClient);
        List messages = fetcher.fetchMessages();
        Assertions.assertEquals((int)0, (int)messages.size());
    }

    @Test
    public void testSendAcks() throws IOException {
        ((SubscriberStub)Mockito.doNothing().when((Object)this.mockSubscriber)).close();
        Mockito.when((Object)this.mockPubsubQueueClient.getSubscriber((SubscriberStubSettings)ArgumentMatchers.any())).thenReturn((Object)this.mockSubscriber);
        List messageAcks = IntStream.range(0, 20).mapToObj(i -> "msg_" + i).collect(Collectors.toList());
        ((PubsubQueueClient)Mockito.doNothing().when((Object)this.mockPubsubQueueClient)).makeAckRequest((SubscriberStub)ArgumentMatchers.eq((Object)this.mockSubscriber), (String)ArgumentMatchers.eq((Object)SUBSCRIPTION_NAME), (List)ArgumentMatchers.any());
        PubsubMessagesFetcher fetcher = new PubsubMessagesFetcher(PROJECT_ID, SUBSCRIPTION_ID, 1, 1000, MAX_WAIT_TIME_IN_REQUEST, this.mockPubsubQueueClient);
        fetcher.sendAcks(messageAcks);
        ((PubsubQueueClient)Mockito.verify((Object)this.mockPubsubQueueClient, (VerificationMode)Mockito.times((int)2))).makeAckRequest((SubscriberStub)ArgumentMatchers.eq((Object)this.mockSubscriber), (String)ArgumentMatchers.eq((Object)SUBSCRIPTION_NAME), (List)ArgumentMatchers.any());
    }
}

