/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.wire.BatchPublisher;
import com.google.cloud.pubsublite.internal.wire.BatchPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.SingleConnection;
import com.google.cloud.pubsublite.internal.wire.StreamFactory;
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.cloud.pubsublite.proto.PublishResponse;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.Optional;

class BatchPublisherImpl
extends SingleConnection<PublishRequest, PublishResponse, Offset>
implements BatchPublisher {
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private Optional<Offset> lastOffset = Optional.empty();

    private BatchPublisherImpl(StreamFactory<PublishRequest, PublishResponse> streamFactory, StreamObserver<Offset> publishCompleteStream, PublishRequest initialRequest) {
        super(streamFactory, publishCompleteStream);
        this.initialize(initialRequest);
    }

    @Override
    public void publish(Collection<PubSubMessage> messages) {
        PublishRequest.Builder builder = PublishRequest.newBuilder();
        builder.getMessagePublishRequestBuilder().addAllMessages(messages);
        this.sendToStream(builder.build());
    }

    @Override
    protected Status handleInitialResponse(PublishResponse response) {
        if (!response.hasInitialResponse()) {
            return Status.FAILED_PRECONDITION.withDescription("First stream response is not an initial response: " + response);
        }
        return Status.OK;
    }

    @Override
    protected Status handleStreamResponse(PublishResponse response) {
        if (response.hasInitialResponse()) {
            return Status.FAILED_PRECONDITION.withDescription("Received duplicate initial response.");
        }
        if (response.hasMessageResponse()) {
            return this.onMessageResponse(response.getMessageResponse());
        }
        return Status.FAILED_PRECONDITION.withDescription("Received response on stream which was neither a message or initial response.");
    }

    private Status onMessageResponse(MessagePublishResponse response) {
        Offset offset = Offset.of(response.getStartCursor().getOffset());
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.lastOffset.isPresent() && offset.value() <= this.lastOffset.get().value()) {
                Status status = Status.FAILED_PRECONDITION.withDescription("Received out of order offsets on stream.");
                return status;
            }
            this.lastOffset = Optional.of(offset);
        }
        this.sendToClient(offset);
        return Status.OK;
    }

    static class Factory
    implements BatchPublisherFactory {
        Factory() {
        }

        @Override
        public BatchPublisherImpl New(StreamFactory<PublishRequest, PublishResponse> streamFactory, StreamObserver<Offset> clientStream, PublishRequest initialRequest) {
            return new BatchPublisherImpl(streamFactory, clientStream, initialRequest);
        }
    }
}

