/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri.stream;

import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
import com.google.rpc.ErrorInfo;
import com.google.rpc.Status;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLSession;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.EndStreamQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
import org.apache.dubbo.rpc.protocol.tri.frame.Deframer;
import org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder;
import org.apache.dubbo.rpc.protocol.tri.h12.grpc.GrpcUtils;
import org.apache.dubbo.rpc.protocol.tri.stream.AbstractStream;
import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
import org.apache.dubbo.rpc.protocol.tri.transport.AbstractH2TransportListener;
import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;

public abstract class AbstractTripleClientStream
extends AbstractStream
implements ClientStream {
    private static final ErrorTypeAwareLogger LOGGER = LoggerFactory.getErrorTypeAwareLogger(AbstractTripleClientStream.class);
    private static final AttributeKey<SSLSession> SSL_SESSION_KEY = AttributeKey.valueOf((String)"ssl-session");
    private final ClientStream.Listener listener;
    protected final TripleWriteQueue writeQueue;
    private Deframer deframer;
    private final Channel parent;
    private final TripleStreamChannelFuture streamChannelFuture;
    private boolean halfClosed;
    private boolean rst;
    private boolean isReturnTriException = false;

    protected AbstractTripleClientStream(FrameworkModel frameworkModel, Executor executor, TripleWriteQueue writeQueue, ClientStream.Listener listener, Http2StreamChannel http2StreamChannel) {
        super(executor, frameworkModel);
        this.parent = http2StreamChannel.parent();
        this.listener = listener;
        this.writeQueue = writeQueue;
        this.streamChannelFuture = this.initStreamChannel((Channel)http2StreamChannel);
    }

    protected AbstractTripleClientStream(FrameworkModel frameworkModel, Executor executor, TripleWriteQueue writeQueue, ClientStream.Listener listener, Channel parent) {
        super(executor, frameworkModel);
        this.parent = parent;
        this.listener = listener;
        this.writeQueue = writeQueue;
        this.streamChannelFuture = this.initStreamChannel(parent);
    }

    protected abstract TripleStreamChannelFuture initStreamChannel(Channel var1);

    public ChannelFuture sendHeader(Http2Headers headers) {
        if (this.writeQueue == null) {
            return this.parent.newFailedFuture((Throwable)new IllegalStateException("Stream already closed"));
        }
        ChannelFuture checkResult = this.preCheck();
        if (!checkResult.isSuccess()) {
            return checkResult;
        }
        HeaderQueueCommand headerCmd = HeaderQueueCommand.createHeaders(this.streamChannelFuture, headers);
        return this.writeQueue.enqueueFuture(headerCmd, (Executor)this.parent.eventLoop()).addListener(future -> {
            if (!future.isSuccess()) {
                this.transportException(future.cause());
            }
        });
    }

    private void transportException(Throwable cause) {
        TriRpcStatus status = TriRpcStatus.INTERNAL.withDescription("Http2 exception").withCause(cause);
        this.listener.onComplete(status, null, null, false);
    }

    public ChannelFuture cancelByLocal(TriRpcStatus status) {
        ChannelFuture checkResult = this.preCheck();
        if (!checkResult.isSuccess()) {
            return checkResult;
        }
        CancelQueueCommand cmd = CancelQueueCommand.createCommand(this.streamChannelFuture, Http2Error.CANCEL);
        this.rst = true;
        return this.writeQueue.enqueue(cmd);
    }

    @Override
    public SocketAddress remoteAddress() {
        return this.parent.remoteAddress();
    }

    @Override
    public SSLSession getSslSession() {
        return (SSLSession)this.parent.attr(SSL_SESSION_KEY).get();
    }

    public ChannelFuture sendMessage(byte[] message, int compressFlag) {
        ChannelFuture checkResult = this.preCheck();
        if (!checkResult.isSuccess()) {
            return checkResult;
        }
        DataQueueCommand cmd = DataQueueCommand.create(this.streamChannelFuture, message, false, compressFlag);
        return this.writeQueue.enqueueFuture(cmd, (Executor)this.parent.eventLoop()).addListener(future -> {
            if (!future.isSuccess()) {
                this.cancelByLocal(TriRpcStatus.INTERNAL.withDescription("Client write message failed").withCause(future.cause()));
                this.transportException(future.cause());
            }
        });
    }

    @Override
    public void request(int n) {
        this.deframer.request(n);
    }

    public ChannelFuture halfClose() {
        ChannelFuture checkResult = this.preCheck();
        if (!checkResult.isSuccess()) {
            return checkResult;
        }
        EndStreamQueueCommand cmd = EndStreamQueueCommand.create(this.streamChannelFuture);
        return this.writeQueue.enqueueFuture(cmd, (Executor)this.parent.eventLoop()).addListener(future -> {
            if (future.isSuccess()) {
                this.halfClosed = true;
            }
        });
    }

    private ChannelFuture preCheck() {
        if (this.rst) {
            return this.streamChannelFuture.getNow().newFailedFuture((Throwable)new IOException("stream channel has reset"));
        }
        return this.parent.newSucceededFuture();
    }

    protected H2TransportListener createTransportListener() {
        return new ClientTransportListener();
    }

    class ClientTransportListener
    extends AbstractH2TransportListener
    implements H2TransportListener {
        private TriRpcStatus transportError;
        private DeCompressor decompressor;
        private boolean headerReceived;
        private Http2Headers trailers;

        ClientTransportListener() {
        }

        void handleH2TransportError(TriRpcStatus status) {
            AbstractTripleClientStream.this.writeQueue.enqueue(CancelQueueCommand.createCommand(AbstractTripleClientStream.this.streamChannelFuture, Http2Error.NO_ERROR));
            AbstractTripleClientStream.this.rst = true;
            this.finishProcess(status, null, false);
        }

        void finishProcess(TriRpcStatus status, Http2Headers trailers, boolean isReturnTriException) {
            Map<CharSequence, String> reserved = this.filterReservedHeaders(trailers);
            Map<String, Object> attachments = this.headersToMap(trailers, () -> reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getKey()));
            TriRpcStatus statusFromTrailers = this.getStatusFromTrailers(reserved);
            TriRpcStatus detailStatus = statusFromTrailers != null ? statusFromTrailers : status;
            AbstractTripleClientStream.this.listener.onComplete(detailStatus, attachments, reserved, isReturnTriException);
        }

        private TriRpcStatus validateHeaderStatus(Http2Headers headers) {
            Integer httpStatus;
            Integer n = httpStatus = headers.status() == null ? null : Integer.valueOf(Integer.parseInt(headers.status().toString()));
            if (httpStatus == null) {
                return TriRpcStatus.INTERNAL.withDescription("Missing HTTP status code");
            }
            CharSequence contentType = (CharSequence)headers.get((Object)HttpHeaderNames.CONTENT_TYPE.getKey());
            if (contentType == null || !GrpcUtils.isGrpcRequest(contentType.toString())) {
                return TriRpcStatus.fromCode(TriRpcStatus.httpStatusToGrpcCode(httpStatus)).withDescription("invalid content-type: " + contentType);
            }
            return null;
        }

        void onHeaderReceived(Http2Headers headers) {
            Integer triExceptionCodeNum;
            Integer httpStatus;
            if (this.transportError != null) {
                this.transportError.appendDescription("headers:" + headers);
                return;
            }
            if (this.headerReceived) {
                this.transportError = TriRpcStatus.INTERNAL.withDescription("Received headers twice");
                return;
            }
            Integer n = httpStatus = headers.status() == null ? null : Integer.valueOf(Integer.parseInt(headers.status().toString()));
            if (httpStatus != null && Integer.parseInt(httpStatus.toString()) > 100 && httpStatus < 200) {
                return;
            }
            this.headerReceived = true;
            this.transportError = this.validateHeaderStatus(headers);
            CharSequence messageEncoding = (CharSequence)headers.get((Object)TripleHeaderEnum.GRPC_ENCODING.getKey());
            CharSequence triExceptionCode = (CharSequence)headers.get((Object)TripleHeaderEnum.TRI_EXCEPTION_CODE.getKey());
            if (triExceptionCode != null && !(triExceptionCodeNum = Integer.valueOf(Integer.parseInt(triExceptionCode.toString()))).equals(CommonConstants.TRI_EXCEPTION_CODE_NOT_EXISTS)) {
                AbstractTripleClientStream.this.isReturnTriException = true;
            }
            if (null != messageEncoding) {
                String compressorStr = messageEncoding.toString();
                if (!Identity.IDENTITY.getMessageEncoding().equals(compressorStr)) {
                    DeCompressor compressor = DeCompressor.getCompressor(AbstractTripleClientStream.this.frameworkModel, compressorStr);
                    if (null == compressor) {
                        throw TriRpcStatus.UNIMPLEMENTED.withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)).asException();
                    }
                    this.decompressor = compressor;
                }
            }
            TriDecoder.Listener listener = new TriDecoder.Listener(){

                @Override
                public void onRawMessage(byte[] data) {
                    AbstractTripleClientStream.this.listener.onMessage(data, AbstractTripleClientStream.this.isReturnTriException);
                }

                @Override
                public void close() {
                    ClientTransportListener.this.finishProcess(ClientTransportListener.this.statusFromTrailers(ClientTransportListener.this.trailers), ClientTransportListener.this.trailers, AbstractTripleClientStream.this.isReturnTriException);
                }
            };
            AbstractTripleClientStream.this.deframer = new TriDecoder(this.decompressor, listener);
            AbstractTripleClientStream.this.listener.onStart();
        }

        void onTrailersReceived(Http2Headers trailers) {
            if (this.transportError == null && !this.headerReceived) {
                this.transportError = this.validateHeaderStatus(trailers);
            }
            this.trailers = trailers;
            TriRpcStatus status = this.transportError == null ? this.statusFromTrailers(trailers) : (this.transportError = this.transportError.appendDescription("trailers: " + trailers));
            if (AbstractTripleClientStream.this.deframer == null) {
                this.finishProcess(status, trailers, false);
            } else {
                AbstractTripleClientStream.this.deframer.close();
            }
        }

        private TriRpcStatus statusFromTrailers(Http2Headers trailers) {
            TriRpcStatus status;
            Integer intStatus = trailers.getInt((Object)TripleHeaderEnum.STATUS_KEY.getKey());
            TriRpcStatus triRpcStatus = status = intStatus == null ? null : TriRpcStatus.fromCode(intStatus);
            if (status != null) {
                CharSequence message = (CharSequence)trailers.get((Object)TripleHeaderEnum.MESSAGE_KEY.getKey());
                if (message != null) {
                    String description = TriRpcStatus.decodeMessage(message.toString());
                    status = status.withDescription(description);
                }
                return status;
            }
            if (this.headerReceived) {
                return TriRpcStatus.UNKNOWN.withDescription("missing GRPC status in response");
            }
            Integer httpStatus = trailers.status() == null ? null : Integer.valueOf(Integer.parseInt(trailers.status().toString()));
            status = httpStatus != null ? TriRpcStatus.fromCode(TriRpcStatus.httpStatusToGrpcCode(httpStatus)) : TriRpcStatus.INTERNAL.withDescription("missing HTTP status code");
            return status.appendDescription("missing GRPC status, inferred error from HTTP status code");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private TriRpcStatus getStatusFromTrailers(Map<CharSequence, String> metadata) {
            if (null == metadata) {
                return null;
            }
            if (!AbstractStream.getGrpcStatusDetailEnabled()) {
                return null;
            }
            if (!metadata.containsKey(TripleHeaderEnum.STATUS_DETAIL_KEY.getKey())) {
                return null;
            }
            String raw = metadata.remove(TripleHeaderEnum.STATUS_DETAIL_KEY.getKey());
            byte[] statusDetailBin = StreamUtils.decodeASCIIByte(raw);
            ClassLoader tccl = Thread.currentThread().getContextClassLoader();
            try {
                Status statusDetail = Status.parseFrom(statusDetailBin);
                List<Any> detailList = statusDetail.getDetailsList();
                Map<Class<?>, Object> classObjectMap = this.tranFromStatusDetails(detailList);
                TriRpcStatus status = TriRpcStatus.fromCode(statusDetail.getCode()).withDescription(TriRpcStatus.decodeMessage(statusDetail.getMessage()));
                DebugInfo debugInfo = (DebugInfo)classObjectMap.get(DebugInfo.class);
                if (debugInfo != null) {
                    String msg = ExceptionUtils.getStackFrameString((List<String>)debugInfo.getStackEntriesList());
                    status = status.appendDescription(msg);
                }
                TriRpcStatus triRpcStatus = status;
                return triRpcStatus;
            }
            catch (IOException ioException) {
                TriRpcStatus triRpcStatus = null;
                return triRpcStatus;
            }
            finally {
                ClassLoadUtil.switchContextLoader(tccl);
            }
        }

        private Map<Class<?>, Object> tranFromStatusDetails(List<Any> detailList) {
            HashMap map = new HashMap(detailList.size());
            try {
                for (Any any : detailList) {
                    if (any.is(ErrorInfo.class)) {
                        ErrorInfo errorInfo = (ErrorInfo)any.unpack(ErrorInfo.class);
                        map.putIfAbsent(ErrorInfo.class, errorInfo);
                        continue;
                    }
                    if (!any.is(DebugInfo.class)) continue;
                    DebugInfo debugInfo = (DebugInfo)any.unpack(DebugInfo.class);
                    map.putIfAbsent(DebugInfo.class, debugInfo);
                }
            }
            catch (Throwable t) {
                LOGGER.error("4-14", "", "", "tran from grpc-status-details error", t);
            }
            return map;
        }

        @Override
        public void onHeader(Http2Headers headers, boolean endStream) {
            AbstractTripleClientStream.this.executor.execute(() -> {
                if (endStream) {
                    Channel channel;
                    if (!AbstractTripleClientStream.this.halfClosed && (channel = AbstractTripleClientStream.this.streamChannelFuture.getNow()).isActive() && !AbstractTripleClientStream.this.rst) {
                        AbstractTripleClientStream.this.writeQueue.enqueue(CancelQueueCommand.createCommand(AbstractTripleClientStream.this.streamChannelFuture, Http2Error.CANCEL));
                        AbstractTripleClientStream.this.rst = true;
                    }
                    this.onTrailersReceived(headers);
                } else {
                    this.onHeaderReceived(headers);
                }
            });
        }

        @Override
        public void onData(ByteBuf data, boolean endStream) {
            try {
                AbstractTripleClientStream.this.executor.execute(() -> this.doOnData(data, endStream));
            }
            catch (Throwable t) {
                ReferenceCountUtil.release((Object)data);
                LOGGER.error("4-14", "", "", "submit onData task failed", t);
            }
        }

        private void doOnData(ByteBuf data, boolean endStream) {
            if (this.transportError != null) {
                this.transportError.appendDescription("Data:" + data.toString(StandardCharsets.UTF_8));
                ReferenceCountUtil.release((Object)data);
                if (this.transportError.description.length() > 512 || endStream) {
                    this.handleH2TransportError(this.transportError);
                }
                return;
            }
            if (!this.headerReceived) {
                this.handleH2TransportError(TriRpcStatus.INTERNAL.withDescription("headers not received before payload"));
                return;
            }
            AbstractTripleClientStream.this.deframer.deframe(data);
        }

        @Override
        public void cancelByRemote(long errorCode) {
            AbstractTripleClientStream.this.executor.execute(() -> {
                this.transportError = TriRpcStatus.CANCELLED.withDescription("Canceled by remote peer, errorCode=" + errorCode);
                this.finishProcess(this.transportError, null, false);
            });
        }

        @Override
        public void onClose() {
            AbstractTripleClientStream.this.executor.execute(AbstractTripleClientStream.this.listener::onClose);
        }
    }
}

