/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.cloud.starlight.core.rpc;

import com.baidu.cloud.starlight.api.exception.CodecException;
import com.baidu.cloud.starlight.api.exception.RpcException;
import com.baidu.cloud.starlight.api.exception.StarlightRpcException;
import com.baidu.cloud.starlight.api.extension.ExtensionLoader;
import com.baidu.cloud.starlight.api.model.MsgBase;
import com.baidu.cloud.starlight.api.model.Request;
import com.baidu.cloud.starlight.api.model.Response;
import com.baidu.cloud.starlight.api.model.RpcResponse;
import com.baidu.cloud.starlight.api.protocol.Protocol;
import com.baidu.cloud.starlight.api.rpc.Processor;
import com.baidu.cloud.starlight.api.rpc.RpcService;
import com.baidu.cloud.starlight.api.rpc.ServiceInvoker;
import com.baidu.cloud.starlight.api.rpc.ServiceRegistry;
import com.baidu.cloud.starlight.api.rpc.callback.RpcCallback;
import com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory;
import com.baidu.cloud.starlight.api.transport.channel.RpcChannel;
import com.baidu.cloud.starlight.api.utils.GenericUtil;
import com.baidu.cloud.starlight.api.utils.LogUtils;
import com.baidu.cloud.starlight.core.rpc.RpcServiceRegistry;
import com.baidu.cloud.starlight.transport.utils.TimerHolder;
import com.baidu.cloud.thirdparty.netty.util.Timeout;
import com.baidu.cloud.thirdparty.netty.util.TimerTask;
import java.lang.reflect.Method;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerProcessor
implements Processor {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerProcessor.class);
    private ServiceRegistry serviceRegistry;
    private ThreadPoolFactory threadPoolFactory;

    public ServerProcessor(ServiceRegistry serviceRegistry, ThreadPoolFactory threadPoolFactory) {
        this.serviceRegistry = serviceRegistry;
        this.threadPoolFactory = threadPoolFactory;
    }

    @Override
    public ServiceRegistry getRegistry() {
        return this.serviceRegistry;
    }

    @Override
    public void process(MsgBase msgBase, final RpcChannel context) {
        Integer invokeTimeout;
        if (msgBase instanceof Response) {
            LOGGER.error("Received Response message in server side, but is not supported currently");
            throw new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "Received Response message in server side, but is not supported currently");
        }
        final Request request = (Request)msgBase;
        final RpcCallback callback = new RpcCallback(){
            private Timeout timeout;
            private volatile boolean isExecuted = false;

            @Override
            public void onResponse(Response response) {
                if (this.timeout != null && !this.timeout.isCancelled()) {
                    this.timeout.cancel();
                }
                if (!this.isExecuted) {
                    this.isExecuted = true;
                    response.setRequest(request);
                    Protocol protocol = ExtensionLoader.getInstance(Protocol.class).getExtension(request.getProtocolName());
                    long beforeTime = System.currentTimeMillis();
                    LogUtils.addLogTimeAttachment(response, "before_encode_body_time", beforeTime);
                    protocol.getEncoder().encodeBody(response);
                    LogUtils.addLogTimeAttachment(response, "encode_body_cost", System.currentTimeMillis() - beforeTime);
                    LogUtils.addLogTimeAttachment(response, "before_io_thread_execute_time", System.currentTimeMillis());
                    context.send(response);
                }
            }

            @Override
            public void onError(Throwable e) {
                if (this.timeout != null && !this.timeout.isCancelled()) {
                    this.timeout.cancel();
                }
                if (!this.isExecuted) {
                    this.isExecuted = true;
                    RpcResponse response = new RpcResponse(request.getId());
                    response.setProtocolName(request.getProtocolName());
                    if (e instanceof RpcException) {
                        ((Response)response).setStatus(((RpcException)e).getCode());
                    } else {
                        ((Response)response).setStatus(StarlightRpcException.INTERNAL_SERVER_ERROR);
                    }
                    ((Response)response).setErrorMsg(e.getMessage());
                    ((Response)response).setRequest(request);
                    Protocol protocol = ExtensionLoader.getInstance(Protocol.class).getExtension(request.getProtocolName());
                    long beforeTime = System.currentTimeMillis();
                    LogUtils.addLogTimeAttachment(response, "before_encode_body_time", beforeTime);
                    protocol.getEncoder().encodeBody(response);
                    LogUtils.addLogTimeAttachment(response, "encode_body_cost", System.currentTimeMillis() - beforeTime);
                    LogUtils.addLogTimeAttachment(response, "before_io_thread_execute_time", System.currentTimeMillis());
                    context.send(response);
                }
            }

            @Override
            public void addTimeout(Timeout timeout) {
                this.timeout = timeout;
            }

            @Override
            public Request getRequest() {
                return request;
            }
        };
        ServiceInvoker serviceInvoker = this.serviceRegistry.discover(request.getServiceName());
        if (serviceInvoker == null) {
            callback.onError(new StarlightRpcException(StarlightRpcException.SERVICE_NOT_FOUND_EXCEPTION, "Service {" + request.getServiceName() + "} not found in provider"));
            return;
        }
        RpcService rpcService = serviceInvoker.getRpcService();
        if (rpcService.getServiceConfig() != null && rpcService.getServiceConfig().getInvokeTimeoutMills() != null && (invokeTimeout = rpcService.getServiceConfig().getInvokeTimeoutMills()) != null && invokeTimeout > 0) {
            Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask(){

                public void run(Timeout timeout) throws Exception {
                    callback.onError(new StarlightRpcException(StarlightRpcException.TIME_OUT_EXCEPTION, "Call service {" + request.getServiceName() + "} method {" + request.getMethodName() + "} time out"));
                }
            }, (long)invokeTimeout.intValue(), TimeUnit.MILLISECONDS);
            callback.addTimeout(timeout);
        }
        ServerProcessTask task = new ServerProcessTask(request, callback, serviceInvoker);
        LogUtils.addLogTimeAttachment(msgBase, "before_thread_execute_time", System.currentTimeMillis());
        this.threadPoolFactory.getThreadPool(rpcService).execute(task);
    }

    @Override
    public void close() {
        if (this.threadPoolFactory != null) {
            this.threadPoolFactory.close();
        }
    }

    @Override
    public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) {
        this.threadPoolFactory = threadPoolFactory;
    }

    @Override
    public Integer waitTaskCount(String serviceKey) {
        RpcService rpcService = this.serviceRegistry.discover(serviceKey).getRpcService();
        if (rpcService != null) {
            return this.threadPoolFactory.getThreadPool(rpcService).getQueue().size();
        }
        return null;
    }

    @Override
    public Integer processingCount(String serviceKey) {
        RpcService rpcService = this.serviceRegistry.discover(serviceKey).getRpcService();
        if (rpcService != null) {
            return this.threadPoolFactory.getThreadPool(rpcService).getActiveCount();
        }
        return null;
    }

    @Override
    public Long completeCount(String serviceKey) {
        RpcService rpcService = this.serviceRegistry.discover(serviceKey).getRpcService();
        if (rpcService != null) {
            return this.threadPoolFactory.getThreadPool(rpcService).getCompletedTaskCount();
        }
        return null;
    }

    @Override
    public Integer allWaitTaskCount() {
        ThreadPoolExecutor defaultThreadPool = this.threadPoolFactory.defaultThreadPool();
        Integer allWaitTaskCount = defaultThreadPool.getQueue().size() + defaultThreadPool.getActiveCount();
        RpcServiceRegistry registry = RpcServiceRegistry.getInstance();
        for (ServiceInvoker serviceInvoker : registry.rpcServices()) {
            ThreadPoolExecutor serviceThreadPool = this.threadPoolFactory.getThreadPool(serviceInvoker.getRpcService());
            if (serviceThreadPool == defaultThreadPool) continue;
            allWaitTaskCount = allWaitTaskCount + serviceThreadPool.getQueue().size() + serviceThreadPool.getActiveCount();
        }
        return allWaitTaskCount;
    }

    private class ServerProcessTask
    implements Runnable {
        private Request request;
        private RpcCallback callback;
        private ServiceInvoker serviceInvoker;

        public ServerProcessTask(Request request, RpcCallback callback, ServiceInvoker serviceInvoker) {
            this.request = request;
            this.callback = callback;
            this.serviceInvoker = serviceInvoker;
        }

        @Override
        public void run() {
            Object beforeThreadExeTime = this.request.getNoneAdditionKv().get("before_thread_execute_time");
            if (beforeThreadExeTime instanceof Long) {
                LogUtils.addLogTimeAttachment(this.request, "wait_for_thread_cost", System.currentTimeMillis() - (Long)beforeThreadExeTime);
            }
            RpcService rpcService = this.serviceInvoker.getRpcService();
            if (!GenericUtil.isGenericCall(this.request)) {
                Method method = rpcService.getMethod(this.request.getMethodName());
                if (method == null) {
                    this.callback.onError(new StarlightRpcException(StarlightRpcException.METHOD_NOT_FOUND_EXCEPTION, "The called method {" + this.request.getMethodName() + "} does not exist"));
                    return;
                }
                Class<?>[] paramTypes = method.getParameterTypes();
                this.request.setParamsTypes(paramTypes);
                this.request.setGenericParamsTypes(method.getGenericParameterTypes());
                this.request.setReturnType(method.getReturnType());
            } else {
                GenericUtil.markGeneric(this.request);
            }
            try {
                Protocol protocol = ExtensionLoader.getInstance(Protocol.class).getExtension(this.request.getProtocolName());
                if (protocol == null) {
                    throw new StarlightRpcException(StarlightRpcException.BAD_REQUEST, "The request's protocol information is not found");
                }
                long beforeTime = System.currentTimeMillis();
                LogUtils.addLogTimeAttachment(this.request, "before_decode_body_time", beforeTime);
                protocol.getDecoder().decodeBody(this.request);
                LogUtils.addLogTimeAttachment(this.request, "decode_body_cost", System.currentTimeMillis() - beforeTime);
            }
            catch (Exception e2) {
                CodecException e2;
                if (e2 instanceof CodecException) {
                    CodecException codecException = (CodecException)e2;
                    e2 = new CodecException(codecException.getCode(), codecException.getMessage() + " " + "The problem is usually caused by\n 1: difference of api.jar between server and client.\n 2: API contains a type that Starlight(stargate) does not support. eg:HashMap.keySet()");
                }
                this.callback.onError(e2);
                return;
            }
            LogUtils.addLogTimeAttachment(this.request, "before_server_filter_exec_time", System.currentTimeMillis());
            this.serviceInvoker.invoke(this.request, this.callback);
        }
    }
}

