/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.cloud.starlight.core.rpc;

import com.baidu.cloud.starlight.api.common.Constants;
import com.baidu.cloud.starlight.api.exception.CodecException;
import com.baidu.cloud.starlight.api.exception.StarlightRpcException;
import com.baidu.cloud.starlight.api.extension.ExtensionLoader;
import com.baidu.cloud.starlight.api.model.MsgBase;
import com.baidu.cloud.starlight.api.model.Request;
import com.baidu.cloud.starlight.api.model.Response;
import com.baidu.cloud.starlight.api.protocol.Protocol;
import com.baidu.cloud.starlight.api.rpc.LocalContext;
import com.baidu.cloud.starlight.api.rpc.Processor;
import com.baidu.cloud.starlight.api.rpc.ServiceRegistry;
import com.baidu.cloud.starlight.api.rpc.callback.RpcCallback;
import com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory;
import com.baidu.cloud.starlight.api.transport.channel.RpcChannel;
import com.baidu.cloud.starlight.api.utils.GenericUtil;
import com.baidu.cloud.starlight.api.utils.LogUtils;
import java.lang.reflect.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientProcessor
implements Processor {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClientProcessor.class);
    private ThreadPoolFactory threadPoolFactory;

    public ClientProcessor(ThreadPoolFactory threadPoolFactory) {
        this.threadPoolFactory = threadPoolFactory;
    }

    @Override
    public ServiceRegistry getRegistry() {
        throw new StarlightRpcException("Client side is not supported ServiceRegistry currently");
    }

    @Override
    public void process(MsgBase msgBase, RpcChannel context) {
        if (msgBase instanceof Request) {
            LOGGER.error("Received Request message in server side, but is not supported currently");
            throw new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "Received Request message in server side, but is not supported currently");
        }
        Response response = (Response)msgBase;
        ClientProcessTask processTask = new ClientProcessTask(response, context);
        LogUtils.addLogTimeAttachment(msgBase, "before_thread_execute_time", System.currentTimeMillis());
        this.threadPoolFactory.defaultThreadPool().execute(processTask);
    }

    @Override
    public void close() {
    }

    @Override
    public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) {
        this.threadPoolFactory = threadPoolFactory;
    }

    @Override
    public Integer waitTaskCount(String serviceKey) {
        return this.threadPoolFactory.defaultThreadPool().getQueue().size();
    }

    @Override
    public Integer processingCount(String serviceKey) {
        return this.threadPoolFactory.defaultThreadPool().getActiveCount();
    }

    @Override
    public Long completeCount(String serviceKey) {
        return this.threadPoolFactory.defaultThreadPool().getCompletedTaskCount();
    }

    @Override
    public Integer allWaitTaskCount() {
        return this.waitTaskCount(null) + this.threadPoolFactory.defaultThreadPool().getActiveCount();
    }

    private class ClientProcessTask
    implements Runnable {
        private Response response;
        private RpcChannel context;

        public ClientProcessTask(Response response, RpcChannel context) {
            this.response = response;
            this.context = context;
        }

        @Override
        public void run() {
            RpcCallback rpcCallback;
            ClassLoader classLoader = (ClassLoader)LocalContext.getContext("thread.classloader").get(this.context.channel().id().asLongText());
            if (classLoader != null) {
                Thread.currentThread().setContextClassLoader(classLoader);
            } else {
                LOGGER.error("Class Loader related to channel {} is null, plz check", (Object)this.context.channel().id().asLongText());
            }
            Object beforeThreadExeTime = this.response.getNoneAdditionKv().get("before_thread_execute_time");
            if (beforeThreadExeTime instanceof Long) {
                LogUtils.addLogTimeAttachment(this.response, "wait_for_thread_cost", System.currentTimeMillis() - (Long)beforeThreadExeTime);
            }
            if ((rpcCallback = this.context.removeCallback(this.response.getId())) == null) {
                LogUtils.timeoutReqAdditionalLog(this.response);
                return;
            }
            this.response.setRequest(rpcCallback.getRequest());
            Class<?> returnType = rpcCallback.getRequest().getReturnType();
            if (returnType == null) {
                rpcCallback.onError(new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "The returnType in the request message is empty. Cannot deserialize the response data!"));
                return;
            }
            this.response.setReturnType(returnType);
            Type genericType = rpcCallback.getRequest().getGenericReturnType();
            if (genericType == null) {
                rpcCallback.onError(new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "The genericReturnType in the request message is empty. Cannot deserialize the response data!"));
                return;
            }
            this.response.setGenericReturnType(genericType);
            if (GenericUtil.isGenericCall(rpcCallback.getRequest())) {
                GenericUtil.markGeneric(this.response);
            }
            try {
                if (this.response.getStatus() == Constants.SUCCESS_CODE.intValue() && this.response.getBodyBytes() != null && this.response.getBodyBytes().length > 0) {
                    Protocol protocol = ExtensionLoader.getInstance(Protocol.class).getExtension(this.response.getProtocolName());
                    if (protocol == null) {
                        throw new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "The response's protocol information is not found, protocol {" + this.response.getProtocolName() + "}");
                    }
                    long beforeDecodeBodyTime = System.currentTimeMillis();
                    LogUtils.addLogTimeAttachment(this.response, "before_decode_body_time", beforeDecodeBodyTime);
                    protocol.getDecoder().decodeBody(this.response);
                    LogUtils.addLogTimeAttachment(this.response, "decode_body_cost", System.currentTimeMillis() - beforeDecodeBodyTime);
                }
            }
            catch (Exception e2) {
                CodecException e2;
                if (e2 instanceof CodecException) {
                    CodecException codecException = (CodecException)e2;
                    e2 = new CodecException(codecException.getCode(), codecException.getMessage() + " " + "The problem is usually caused by\n 1: difference of api.jar between server and client.\n 2: API contains a type that Starlight(stargate) does not support. eg:HashMap.keySet()");
                }
                rpcCallback.onError(e2);
                return;
            }
            rpcCallback.onResponse(this.response);
        }
    }
}

