/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.server.grpc;

import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpResponseWriter;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.SerializationFormat;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
import com.linecorp.armeria.common.grpc.GrpcJsonMarshaller;
import com.linecorp.armeria.common.grpc.GrpcSerializationFormats;
import com.linecorp.armeria.common.grpc.protocol.DeframedMessage;
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.common.stream.HttpDecoder;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.internal.common.grpc.GrpcLogUtil;
import com.linecorp.armeria.internal.common.grpc.HttpStreamDeframer;
import com.linecorp.armeria.internal.common.grpc.TransportStatusListener;
import com.linecorp.armeria.internal.server.grpc.AbstractServerCall;
import com.linecorp.armeria.internal.server.grpc.ServerStatusAndMetadata;
import com.linecorp.armeria.internal.shaded.guava.base.Preconditions;
import com.linecorp.armeria.server.ServiceRequestContext;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.EventExecutor;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class StreamingServerCall<I, O>
extends AbstractServerCall<I, O>
implements Subscriber<DeframedMessage>,
TransportStatusListener {
    private static final AtomicIntegerFieldUpdater<StreamingServerCall> pendingMessagesUpdater = AtomicIntegerFieldUpdater.newUpdater(StreamingServerCall.class, "pendingMessages");
    private final MethodDescriptor<I, O> method;
    private final StreamMessage<DeframedMessage> deframedRequest;
    private final HttpResponseWriter res;
    private final ServiceRequestContext ctx;
    @Nullable
    private O firstResponse;
    @Nullable
    private Subscription upstream;
    private int pendingRequests;
    private volatile int pendingMessages;

    StreamingServerCall(HttpRequest req, MethodDescriptor<I, O> method, String simpleMethodName, CompressorRegistry compressorRegistry, DecompressorRegistry decompressorRegistry, HttpResponseWriter res, int maxRequestMessageLength, int maxResponseMessageLength, ServiceRequestContext ctx, SerializationFormat serializationFormat, @Nullable GrpcJsonMarshaller jsonMarshaller, boolean unsafeWrapRequestBuffers, ResponseHeaders defaultHeaders, @Nullable GrpcExceptionHandlerFunction exceptionHandler, @Nullable Executor blockingExecutor, boolean autoCompress) {
        super(req, method, simpleMethodName, compressorRegistry, decompressorRegistry, (HttpResponse)res, maxResponseMessageLength, ctx, serializationFormat, jsonMarshaller, unsafeWrapRequestBuffers, defaultHeaders, exceptionHandler, blockingExecutor, autoCompress);
        Objects.requireNonNull(req, "req");
        this.method = Objects.requireNonNull(method, "method");
        this.ctx = Objects.requireNonNull(ctx, "ctx");
        boolean grpcWebText = GrpcSerializationFormats.isGrpcWebText(serializationFormat);
        Objects.requireNonNull(decompressorRegistry, "decompressorRegistry");
        RequestHeaders clientHeaders = req.headers();
        ByteBufAllocator alloc = ctx.alloc();
        HttpStreamDeframer requestDeframer = new HttpStreamDeframer(decompressorRegistry, (RequestContext)ctx, this, exceptionHandler, maxRequestMessageLength, grpcWebText, true).decompressor(StreamingServerCall.clientDecompressor((HttpHeaders)clientHeaders, decompressorRegistry));
        this.deframedRequest = req.decode((HttpDecoder)requestDeframer, alloc);
        requestDeframer.setDeframedStreamMessage(this.deframedRequest);
        this.res = Objects.requireNonNull(res, "res");
    }

    public void request(int numMessages) {
        if (this.ctx.eventLoop().inEventLoop()) {
            this.request0(numMessages);
        } else {
            this.ctx.eventLoop().execute(() -> this.request0(numMessages));
        }
    }

    private void request0(int numMessages) {
        if (this.upstream == null) {
            this.pendingRequests += numMessages;
        } else {
            this.upstream.request((long)numMessages);
        }
    }

    @Override
    public void startDeframing() {
        this.deframedRequest.subscribe((Subscriber)this, (EventExecutor)this.ctx.eventLoop(), new SubscriptionOption[]{SubscriptionOption.WITH_POOLED_OBJECTS});
    }

    public void sendMessage(O message) {
        pendingMessagesUpdater.incrementAndGet(this);
        if (this.ctx.eventLoop().inEventLoop()) {
            this.doSendMessage(message);
        } else {
            this.ctx.eventLoop().execute(() -> this.doSendMessage(message));
        }
    }

    private void doSendMessage(O message) {
        if (this.isCancelled()) {
            return;
        }
        ResponseHeaders responseHeaders = this.responseHeaders();
        Preconditions.checkState((responseHeaders != null ? 1 : 0) != 0, (Object)"sendHeaders has not been called");
        Preconditions.checkState((!this.isCloseCalled() ? 1 : 0) != 0, (Object)"call is closed");
        if (this.firstResponse == null) {
            if (!this.res.tryWrite((Object)responseHeaders)) {
                this.maybeCancel();
                return;
            }
            this.firstResponse = message;
        }
        try {
            if (this.res.tryWrite((Object)this.toPayload(message))) {
                if (!this.method.getType().serverSendsOneMessage()) {
                    this.res.whenConsumed().thenRun(() -> {
                        if (!this.isCloseCalled() && pendingMessagesUpdater.decrementAndGet(this) == 0) {
                            Executor blockingExecutor = this.blockingExecutor();
                            if (blockingExecutor != null) {
                                blockingExecutor.execute(() -> this.invokeOnReady());
                            } else {
                                this.invokeOnReady();
                            }
                        }
                    });
                }
            } else {
                this.maybeCancel();
            }
        }
        catch (Throwable e) {
            this.close(e, true);
        }
    }

    public boolean isReady() {
        return !this.isCloseCalled() && this.pendingMessages == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void doClose(ServerStatusAndMetadata statusAndMetadata) {
        boolean trailersOnly;
        Status status = statusAndMetadata.status();
        Metadata metadata = statusAndMetadata.metadata();
        if (this.firstResponse != null) {
            trailersOnly = false;
        } else {
            ResponseHeaders responseHeaders = this.responseHeaders();
            if (!status.isOk() || responseHeaders == null) {
                trailersOnly = true;
            } else {
                assert (!this.method.getType().serverSendsOneMessage());
                if (this.res.tryWrite((Object)responseHeaders)) {
                    trailersOnly = false;
                } else {
                    statusAndMetadata.shouldCancel();
                    statusAndMetadata.setResponseContent(true);
                    this.closeListener(statusAndMetadata);
                    return;
                }
            }
        }
        this.ctx.logBuilder().responseContent((Object)GrpcLogUtil.rpcResponse(statusAndMetadata, this.firstResponse), null);
        try {
            if (this.res.tryWrite((Object)this.responseTrailers(this.ctx, status, metadata, trailersOnly))) {
                this.res.close();
            }
        }
        finally {
            statusAndMetadata.setResponseContent(false);
            this.closeListener(statusAndMetadata);
        }
    }

    @Override
    @Nullable
    protected O firstResponse() {
        return this.firstResponse;
    }

    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription, "subscription");
        this.upstream = subscription;
        if (this.pendingRequests > 0) {
            this.upstream.request((long)this.pendingRequests);
            this.pendingRequests = 0;
        }
    }

    public void onNext(DeframedMessage message) {
        this.onRequestMessage(message, false);
    }

    public void onComplete() {
        this.onRequestComplete();
    }

    @Override
    public void onError(Throwable t) {
        if (!this.isCloseCalled() && !(t instanceof AbortedStreamException)) {
            this.close(t, true);
        }
    }

    @Override
    public void transportReportStatus(Status status, Metadata metadata) {
        if (this.isCloseCalled()) {
            return;
        }
        this.closeListener(new ServerStatusAndMetadata(status, metadata, true, true));
    }
}

