/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.PullSubscriber;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Optional;

public class BufferingPullSubscriber
implements PullSubscriber<SequencedMessage> {
    private final Subscriber underlying;
    @GuardedBy(value="this")
    private Optional<CheckedApiException> error = Optional.empty();
    @GuardedBy(value="this")
    private Deque<SequencedMessage> messages = new ArrayDeque<SequencedMessage>();
    @GuardedBy(value="this")
    private Optional<Offset> lastDelivered = Optional.empty();

    public BufferingPullSubscriber(SubscriberFactory factory, FlowControlSettings settings) throws CheckedApiException {
        this.underlying = factory.newSubscriber(this::addMessages);
        this.underlying.addListener(new ApiService.Listener(){

            public void failed(ApiService.State state, Throwable throwable) {
                BufferingPullSubscriber.this.fail(ExtractStatus.toCanonical(throwable));
            }
        }, MoreExecutors.directExecutor());
        this.underlying.startAsync().awaitRunning();
        this.underlying.allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(settings.messagesOutstanding()).setAllowedBytes(settings.bytesOutstanding()).build());
    }

    private synchronized void fail(CheckedApiException e) {
        this.error = Optional.of(e);
    }

    private synchronized void addMessages(Collection<SequencedMessage> new_messages) {
        this.messages.addAll(new_messages);
    }

    @Override
    public synchronized List<SequencedMessage> pull() throws CheckedApiException {
        if (this.error.isPresent()) {
            throw this.error.get();
        }
        if (this.messages.isEmpty()) {
            return ImmutableList.of();
        }
        Deque<SequencedMessage> collection = this.messages;
        this.messages = new ArrayDeque<SequencedMessage>();
        long bytes = collection.stream().mapToLong(SequencedMessage::byteSize).sum();
        this.underlying.allowFlow(FlowControlRequest.newBuilder().setAllowedBytes(bytes).setAllowedMessages((long)collection.size()).build());
        this.lastDelivered = Optional.of(((SequencedMessage)Iterables.getLast(collection)).offset());
        return ImmutableList.copyOf(collection);
    }

    @Override
    public synchronized Optional<Offset> nextOffset() {
        return this.lastDelivered.map(offset -> Offset.of(offset.value() + 1L));
    }

    @Override
    public void close() {
        this.underlying.stopAsync().awaitTerminated();
    }
}

