/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.gax.rpc.ResponseObserver;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.wire.BatchPublisher;
import com.google.cloud.pubsublite.internal.wire.BatchPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.SingleConnection;
import com.google.cloud.pubsublite.internal.wire.StreamFactories;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.MessagePublishRequest;
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.cloud.pubsublite.proto.PublishResponse;
import java.util.Collection;

class BatchPublisherImpl
extends SingleConnection<PublishRequest, PublishResponse, MessagePublishResponse>
implements BatchPublisher {
    private final boolean sendSequenceNumbers;

    private BatchPublisherImpl(StreamFactories.PublishStreamFactory streamFactory, ResponseObserver<MessagePublishResponse> publishCompleteStream, PublishRequest initialRequest) {
        super(streamFactory, publishCompleteStream);
        this.initialize(initialRequest);
        this.sendSequenceNumbers = !initialRequest.getInitialRequest().getClientId().isEmpty();
    }

    @Override
    public void publish(Collection<PubSubMessage> messages, PublishSequenceNumber firstSequenceNumber) {
        PublishRequest.Builder builder = PublishRequest.newBuilder();
        MessagePublishRequest.Builder publishRequestBuilder = builder.getMessagePublishRequestBuilder();
        publishRequestBuilder.addAllMessages(messages);
        if (this.sendSequenceNumbers) {
            publishRequestBuilder.setFirstSequenceNumber(firstSequenceNumber.value());
        }
        this.sendToStream(builder.build());
    }

    @Override
    protected void handleInitialResponse(PublishResponse response) throws CheckedApiException {
        CheckedApiPreconditions.checkState(response.hasInitialResponse(), "First stream response is not an initial response: " + response);
    }

    @Override
    protected void handleStreamResponse(PublishResponse response) throws CheckedApiException {
        CheckedApiPreconditions.checkState(!response.hasInitialResponse(), "Received duplicate initial response.");
        CheckedApiPreconditions.checkState(response.hasMessageResponse(), "Received response on stream which was neither a message or initial response.");
        this.sendToClient(response.getMessageResponse());
    }

    static class Factory
    implements BatchPublisherFactory {
        Factory() {
        }

        @Override
        public BatchPublisherImpl New(StreamFactory<PublishRequest, PublishResponse> streamFactory, ResponseObserver<MessagePublishResponse> clientStream, PublishRequest initialRequest) {
            return new BatchPublisherImpl(streamFactory::New, clientStream, initialRequest);
        }
    }
}

