/*
 * Decompiled with CFR 0.152.
 */
package com.mineplex.studio.messaging.client;

import com.mineplex.studio.messaging.client.MessagingPollingClient;
import com.mineplex.studio.messaging.contracts.Message;
import com.mineplex.studio.messaging.contracts.MessagingGrpc;
import com.mineplex.studio.messaging.contracts.StreamMessagesRequest;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MessagingStreamingClient {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MessagingStreamingClient.class);
    private static final RetryConfig STREAM_CONNECTION_RETRY = RetryConfig.custom().maxAttempts(10).intervalFunction(IntervalFunction.ofExponentialRandomBackoff((Duration)Duration.ofMillis(500L), (double)1.5, (double)0.5, (Duration)Duration.ofSeconds(10L))).build();
    private final MessagingGrpc.MessagingBlockingStub blockingStub;

    public void subscribeToMessageStream(@NonNull List<String> targetTypes, @Nullable String namespaceId, @Nullable String projectId, @NonNull Consumer<Message> messageConsumer) {
        if (targetTypes == null) {
            throw new NullPointerException("targetTypes is marked non-null but is null");
        }
        if (messageConsumer == null) {
            throw new NullPointerException("messageConsumer is marked non-null but is null");
        }
        StreamMessagesRequest.Builder request = StreamMessagesRequest.newBuilder().addAllTargetClassTypes(targetTypes);
        MessagingPollingClient pollingClient = new MessagingPollingClient(MessagingPollingClient.PollingClientFilter.builder().identifier(this.toString()).targetTypes(targetTypes).namespace(namespaceId).project(projectId).build(), this.blockingStub, Instant.now());
        if (namespaceId != null) {
            request.setNamespaceId(namespaceId);
        }
        if (projectId != null) {
            request.setProjectId(projectId);
        }
        AtomicReference<Instant> lastReceiveTime = new AtomicReference<Instant>(Instant.now());
        AtomicInteger retryCount = new AtomicInteger(0);
        Retry.decorateRunnable((Retry)Retry.of((String)"messaging-stream-retry", (RetryConfig)STREAM_CONNECTION_RETRY), () -> {
            Iterator<Message> iterator = this.blockingStub.streamMessages(request.build());
            if (retryCount.incrementAndGet() > 1) {
                pollingClient.setLastListTime((Instant)lastReceiveTime.get());
                pollingClient.listUnreadMessages().forEach(message -> {
                    messageConsumer.accept((Message)message);
                    lastReceiveTime.set(Instant.ofEpochSecond(message.getTimestamp().getSeconds(), message.getTimestamp().getNanos()));
                });
            }
            iterator.forEachRemaining(message -> {
                lastReceiveTime.set(Instant.ofEpochSecond(message.getTimestamp().getSeconds(), message.getTimestamp().getNanos()));
                messageConsumer.accept((Message)message);
            });
        }).run();
    }

    @Generated
    public MessagingStreamingClient(MessagingGrpc.MessagingBlockingStub blockingStub) {
        this.blockingStub = blockingStub;
    }
}

