/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberFactory;
import com.google.cloud.pubsublite.internal.wire.Predicates;
import com.google.cloud.pubsublite.internal.wire.SingleConnection;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.MessageResponse;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.stream.Collectors;

class ConnectedSubscriberImpl
extends SingleConnection<SubscribeRequest, SubscribeResponse, List<SequencedMessage>>
implements ConnectedSubscriber {
    private final SubscribeRequest initialRequest;

    private ConnectedSubscriberImpl(StreamFactory<SubscribeRequest, SubscribeResponse> streamFactory, ResponseObserver<List<SequencedMessage>> clientStream, SubscribeRequest initialRequest) {
        super(streamFactory, clientStream);
        this.initialRequest = initialRequest;
        this.initialize(initialRequest);
    }

    @Override
    public void allowFlow(FlowControlRequest request) {
        Preconditions.checkArgument((request.getAllowedBytes() >= 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((request.getAllowedMessages() >= 0L ? 1 : 0) != 0);
        this.sendToStream(SubscribeRequest.newBuilder().setFlowControl(request).build());
    }

    @Override
    protected void handleInitialResponse(SubscribeResponse response) throws CheckedApiException {
        CheckedApiPreconditions.checkState(response.hasInitial(), String.format("Received non-initial first response %s on stream with initial request %s.", response, this.initialRequest));
    }

    @Override
    protected void handleStreamResponse(SubscribeResponse response) throws CheckedApiException {
        switch (response.getResponseCase()) {
            case INITIAL: {
                throw new CheckedApiException(String.format("Received duplicate initial response on stream with initial request %s.", this.initialRequest), StatusCode.Code.FAILED_PRECONDITION);
            }
            case MESSAGES: {
                this.onMessages(response.getMessages());
                return;
            }
            case SEEK: {
                throw new CheckedApiException(String.format("Received seek response from client which never sends seek requests %s.", this.initialRequest), StatusCode.Code.FAILED_PRECONDITION);
            }
        }
        throw new CheckedApiException("Received a message on the stream with no case set.", StatusCode.Code.FAILED_PRECONDITION);
    }

    private void onMessages(MessageResponse response) throws CheckedApiException {
        CheckedApiPreconditions.checkState(response.getMessagesCount() > 0, String.format("Received an empty MessageResponse on stream with initial request %s.", this.initialRequest));
        List<SequencedMessage> messages = response.getMessagesList().stream().map(SequencedMessage::fromProto).collect(Collectors.toList());
        CheckedApiPreconditions.checkState(Predicates.isOrdered(messages), String.format("Received out of order messages on the stream with initial request %s.", this.initialRequest));
        this.sendToClient(messages);
    }

    static class Factory
    implements ConnectedSubscriberFactory {
        Factory() {
        }

        @Override
        public ConnectedSubscriberImpl New(StreamFactory<SubscribeRequest, SubscribeResponse> streamFactory, ResponseObserver<List<SequencedMessage>> clientStream, SubscribeRequest initialRequest) {
            return new ConnectedSubscriberImpl(streamFactory, clientStream, initialRequest);
        }
    }
}

