/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.internal.client.grpc;

import com.linecorp.armeria.client.Client;
import com.linecorp.armeria.client.DefaultClientRequestContext;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpHeadersBuilder;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpRequestWriter;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.RequestHeadersBuilder;
import com.linecorp.armeria.common.SerializationFormat;
import com.linecorp.armeria.common.grpc.GrpcSerializationFormats;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageDeframer;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaMessageFramer;
import com.linecorp.armeria.common.grpc.protocol.GrpcHeaderNames;
import com.linecorp.armeria.common.logging.RequestLogAccess;
import com.linecorp.armeria.common.logging.RequestLogProperty;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.common.util.TimeoutMode;
import com.linecorp.armeria.internal.client.ClientUtil;
import com.linecorp.armeria.internal.common.grpc.ForwardingCompressor;
import com.linecorp.armeria.internal.common.grpc.GrpcLogUtil;
import com.linecorp.armeria.internal.common.grpc.GrpcMessageMarshaller;
import com.linecorp.armeria.internal.common.grpc.GrpcStatus;
import com.linecorp.armeria.internal.common.grpc.HttpStreamReader;
import com.linecorp.armeria.internal.common.grpc.MetadataUtil;
import com.linecorp.armeria.internal.common.grpc.TimeoutHeaderUtil;
import com.linecorp.armeria.internal.common.grpc.TransportStatusListener;
import com.linecorp.armeria.unsafe.grpc.GrpcUnsafeBufferUtil;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.netty.buffer.ByteBuf;
import io.netty.util.ByteProcessor;
import io.netty.util.concurrent.EventExecutor;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.curioswitch.common.protobuf.json.MessageMarshaller;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ArmeriaClientCall<I, O>
extends ClientCall<I, O>
implements ArmeriaMessageDeframer.Listener,
TransportStatusListener {
    private static final Runnable NO_OP = () -> {};
    private static final Logger logger = LoggerFactory.getLogger(ArmeriaClientCall.class);
    private static final ByteProcessor FIND_COLON = new ByteProcessor.IndexOfProcessor(58);
    private static final AtomicIntegerFieldUpdater<ArmeriaClientCall> pendingMessagesUpdater = AtomicIntegerFieldUpdater.newUpdater(ArmeriaClientCall.class, "pendingMessages");
    private final DefaultClientRequestContext ctx;
    private final EndpointGroup endpointGroup;
    private final HttpClient httpClient;
    private final HttpRequestWriter req;
    private final MethodDescriptor<I, O> method;
    private final CallOptions callOptions;
    private final ArmeriaMessageFramer messageFramer;
    private final GrpcMessageMarshaller<I, O> marshaller;
    private final CompressorRegistry compressorRegistry;
    private final HttpStreamReader responseReader;
    private final boolean unsafeWrapResponseBuffers;
    @Nullable
    private final Executor executor;
    private final String advertisedEncodingsHeader;
    private final SerializationFormat serializationFormat;
    @Nullable
    private ClientCall.Listener<O> listener;
    @Nullable
    private O firstResponse;
    private boolean cancelCalled;
    private volatile int pendingMessages;

    ArmeriaClientCall(DefaultClientRequestContext ctx, EndpointGroup endpointGroup, HttpClient httpClient, HttpRequestWriter req, MethodDescriptor<I, O> method, int maxOutboundMessageSizeBytes, int maxInboundMessageSizeBytes, CallOptions callOptions, CompressorRegistry compressorRegistry, DecompressorRegistry decompressorRegistry, SerializationFormat serializationFormat, @Nullable MessageMarshaller jsonMarshaller, boolean unsafeWrapResponseBuffers, String advertisedEncodingsHeader) {
        this.ctx = ctx;
        this.endpointGroup = endpointGroup;
        this.httpClient = httpClient;
        this.req = req;
        this.method = method;
        this.callOptions = callOptions;
        this.compressorRegistry = compressorRegistry;
        this.unsafeWrapResponseBuffers = unsafeWrapResponseBuffers;
        this.advertisedEncodingsHeader = advertisedEncodingsHeader;
        this.serializationFormat = serializationFormat;
        this.messageFramer = new ArmeriaMessageFramer(ctx.alloc(), maxOutboundMessageSizeBytes);
        this.marshaller = new GrpcMessageMarshaller<I, O>(ctx.alloc(), serializationFormat, method, jsonMarshaller, unsafeWrapResponseBuffers);
        this.responseReader = new HttpStreamReader(decompressorRegistry, new ArmeriaMessageDeframer((ArmeriaMessageDeframer.Listener)this, maxInboundMessageSizeBytes, ctx.alloc()), this);
        this.executor = callOptions.getExecutor();
        req.whenComplete().handle((unused1, unused2) -> {
            if (!ctx.log().isAvailable(RequestLogProperty.REQUEST_CONTENT)) {
                ctx.logBuilder().requestContent((Object)GrpcLogUtil.rpcRequest(method), null);
            }
            return null;
        });
    }

    public void start(ClientCall.Listener<O> responseListener, Metadata metadata) {
        Codec compressor;
        Objects.requireNonNull(responseListener, "responseListener");
        Objects.requireNonNull(metadata, "metadata");
        if (this.callOptions.getCompressor() != null) {
            compressor = this.compressorRegistry.lookupCompressor(this.callOptions.getCompressor());
            if (compressor == null) {
                responseListener.onClose(Status.INTERNAL.withDescription("Unable to find compressor by name " + this.callOptions.getCompressor()), new Metadata());
                return;
            }
        } else {
            compressor = Codec.Identity.NONE;
        }
        this.messageFramer.setCompressor(ForwardingCompressor.forGrpc((Compressor)compressor));
        this.prepareHeaders((Compressor)compressor, metadata);
        this.listener = responseListener;
        if (this.callOptions.getDeadline() != null) {
            long remainingMillis = this.callOptions.getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
            if (remainingMillis <= 0L) {
                Status status = Status.DEADLINE_EXCEEDED.augmentDescription("ClientCall started after deadline exceeded: " + this.callOptions.getDeadline());
                this.close(status, new Metadata());
            } else {
                this.ctx.setResponseTimeoutMillis(TimeoutMode.SET_FROM_NOW, remainingMillis);
                this.ctx.setResponseTimeoutHandler(() -> {
                    Status status = Status.DEADLINE_EXCEEDED.augmentDescription("deadline exceeded after " + TimeUnit.MILLISECONDS.toNanos(remainingMillis) + "ns.");
                    this.close(status, new Metadata());
                });
            }
        }
        HttpResponse res = (HttpResponse)ClientUtil.initContextAndExecuteWithFallback((Client)this.httpClient, (DefaultClientRequestContext)this.ctx, (EndpointGroup)this.endpointGroup, (unused, cause) -> HttpResponse.ofFailure((Throwable)GrpcStatus.fromThrowable(cause).withDescription(cause.getMessage()).asRuntimeException()));
        res.subscribe((Subscriber)this.responseReader, (EventExecutor)this.ctx.eventLoop(), new SubscriptionOption[]{SubscriptionOption.WITH_POOLED_OBJECTS});
        res.whenComplete().handleAsync((BiFunction)this.responseReader, (Executor)this.ctx.eventLoop());
        responseListener.onReady();
    }

    public void request(int numMessages) {
        if (this.ctx.eventLoop().inEventLoop()) {
            this.responseReader.request(numMessages);
        } else {
            this.ctx.eventLoop().submit(() -> this.responseReader.request(numMessages));
        }
    }

    public void cancel(@Nullable String message, @Nullable Throwable cause) {
        if (this.ctx.eventLoop().inEventLoop()) {
            this.doCancel(message, cause);
        } else {
            this.ctx.eventLoop().submit(() -> this.doCancel(message, cause));
        }
    }

    private void doCancel(@Nullable String message, @Nullable Throwable cause) {
        if (message == null && cause == null) {
            cause = new CancellationException("Cancelled without a message or cause");
            logger.warn("Cancelling without a message or cause is suboptimal", cause);
        }
        if (this.cancelCalled) {
            return;
        }
        this.cancelCalled = true;
        Status status = Status.CANCELLED;
        if (message != null) {
            status = status.withDescription(message);
        }
        if (cause != null) {
            status = status.withCause(cause);
        }
        this.close(status, new Metadata());
        if (cause == null) {
            this.req.abort();
        } else {
            this.req.abort(cause);
        }
    }

    public void halfClose() {
        if (this.ctx.eventLoop().inEventLoop()) {
            this.req.close();
        } else {
            this.ctx.eventLoop().submit(() -> ((HttpRequestWriter)this.req).close());
        }
    }

    public void sendMessage(I message) {
        pendingMessagesUpdater.incrementAndGet(this);
        if (this.ctx.eventLoop().inEventLoop()) {
            this.doSendMessage(message);
        } else {
            this.ctx.eventLoop().submit(() -> this.doSendMessage(message));
        }
    }

    public boolean isReady() {
        return this.pendingMessages == 0;
    }

    private void doSendMessage(I message) {
        RequestLogAccess log = this.ctx.log();
        if (log.isComplete()) {
            return;
        }
        try {
            if (!log.isAvailable(RequestLogProperty.REQUEST_CONTENT)) {
                this.ctx.logBuilder().requestContent((Object)GrpcLogUtil.rpcRequest(this.method, message), null);
            }
            ByteBuf serialized = this.marshaller.serializeRequest(message);
            this.req.write((Object)this.messageFramer.writePayload(serialized));
            this.req.whenConsumed().thenRun(() -> {
                if (pendingMessagesUpdater.decrementAndGet(this) == 0) {
                    try (SafeCloseable ignored = this.ctx.push();){
                        assert (this.listener != null);
                        this.listener.onReady();
                    }
                    catch (Throwable t) {
                        this.close(GrpcStatus.fromThrowable(t), new Metadata());
                    }
                }
            });
        }
        catch (Throwable t) {
            this.cancel(null, t);
        }
    }

    public synchronized void setMessageCompression(boolean enabled) {
        this.messageFramer.setMessageCompression(enabled);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void messageRead(ArmeriaMessageDeframer.DeframedMessage message) {
        if (GrpcSerializationFormats.isGrpcWeb(this.serializationFormat) && message.type() >> 7 == 1) {
            ByteBuf buf = message.buf();
            assert (buf != null);
            try {
                HttpHeaders trailers = ArmeriaClientCall.parseGrpcWebTrailers(buf);
                if (trailers == null) {
                    this.close(Status.INTERNAL.withDescription("grpc-web trailers malformed: " + buf.toString(StandardCharsets.UTF_8)), new Metadata());
                } else {
                    GrpcStatus.reportStatus(trailers, this.responseReader, this);
                }
            }
            finally {
                buf.release();
            }
            return;
        }
        try {
            O msg = this.marshaller.deserializeResponse(message);
            if (this.firstResponse == null) {
                this.firstResponse = msg;
            }
            ByteBuf buf = message.buf();
            if (this.unsafeWrapResponseBuffers && buf != null) {
                GrpcUnsafeBufferUtil.storeBuffer(buf, msg, (RequestContext)this.ctx);
            }
            try (SafeCloseable ignored = this.ctx.push();){
                assert (this.listener != null);
                this.listener.onMessage(msg);
            }
        }
        catch (Throwable t) {
            this.req.close((Throwable)GrpcStatus.fromThrowable(t).asException());
            throw t instanceof RuntimeException ? (RuntimeException)t : new RuntimeException(t);
        }
        this.notifyExecutor();
    }

    public void endOfStream() {
    }

    @Override
    public void transportReportStatus(Status status, Metadata metadata) {
        this.close(status, metadata);
    }

    private void prepareHeaders(Compressor compressor, Metadata metadata) {
        RequestHeadersBuilder newHeaders = this.req.headers().toBuilder();
        if (compressor != Codec.Identity.NONE) {
            newHeaders.set((CharSequence)GrpcHeaderNames.GRPC_ENCODING, compressor.getMessageEncoding());
        }
        if (!this.advertisedEncodingsHeader.isEmpty()) {
            newHeaders.add((CharSequence)GrpcHeaderNames.GRPC_ACCEPT_ENCODING, this.advertisedEncodingsHeader);
        }
        newHeaders.add((CharSequence)GrpcHeaderNames.GRPC_TIMEOUT, TimeoutHeaderUtil.toHeaderValue(TimeUnit.MILLISECONDS.toNanos(this.ctx.responseTimeoutMillis())));
        MetadataUtil.fillHeaders(metadata, (HttpHeadersBuilder)newHeaders);
        HttpRequest newReq = this.req.withHeaders(newHeaders);
        this.ctx.updateRequest(newReq);
    }

    private void close(Status status, Metadata metadata) {
        Deadline deadline = this.callOptions.getDeadline();
        if (status.getCode() == Status.Code.CANCELLED && deadline != null && deadline.isExpired()) {
            status = Status.DEADLINE_EXCEEDED.augmentDescription("ClientCall was cancelled at or after deadline.");
            metadata = new Metadata();
        }
        this.ctx.logBuilder().responseContent((Object)GrpcLogUtil.rpcResponse(status, this.firstResponse), null);
        if (status.isOk()) {
            this.req.abort();
        } else {
            this.req.abort((Throwable)status.asRuntimeException(metadata));
        }
        this.responseReader.cancel();
        try (SafeCloseable ignored = this.ctx.push();){
            assert (this.listener != null);
            this.listener.onClose(status, metadata);
        }
        this.notifyExecutor();
    }

    private void notifyExecutor() {
        if (this.executor != null) {
            this.executor.execute(NO_OP);
        }
    }

    @Nullable
    private static HttpHeaders parseGrpcWebTrailers(ByteBuf buf) {
        HttpHeadersBuilder trailers = HttpHeaders.builder();
        while (buf.readableBytes() > 0) {
            int endExclusive;
            int start = buf.forEachByte(ByteProcessor.FIND_NON_LINEAR_WHITESPACE);
            if (start == -1) {
                return null;
            }
            if (buf.getByte(start) == 58) {
                buf.skipBytes(1);
                endExclusive = buf.forEachByte(FIND_COLON);
                buf.readerIndex(start);
            } else {
                endExclusive = buf.forEachByte(FIND_COLON);
            }
            if (endExclusive == -1) {
                return null;
            }
            CharSequence name = buf.readCharSequence(endExclusive - start, StandardCharsets.UTF_8);
            buf.readerIndex(endExclusive + 1);
            start = buf.forEachByte(ByteProcessor.FIND_NON_LINEAR_WHITESPACE);
            buf.readerIndex(start);
            endExclusive = buf.forEachByte(ByteProcessor.FIND_CRLF);
            CharSequence value = buf.readCharSequence(endExclusive - start, StandardCharsets.UTF_8);
            trailers.add(name, value.toString());
            start = buf.forEachByte(ByteProcessor.FIND_NON_CRLF);
            if (start != -1) {
                buf.readerIndex(start);
                continue;
            }
            buf.skipBytes(buf.readableBytes());
        }
        return trailers.build();
    }
}

