/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.extract.base.server;

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import lombok.Generated;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
import org.apache.dolphinscheduler.extract.base.StandardRpcResponse;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterHeader;
import org.apache.dolphinscheduler.extract.base.serialize.JsonSerializer;
import org.apache.dolphinscheduler.extract.base.server.ServerMethodInvoker;
import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class JdkDynamicServerHandler
extends ChannelInboundHandlerAdapter {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdkDynamicServerHandler.class);
    private final NettyRemotingServer nettyRemotingServer;
    private final Map<String, ServerMethodInvoker> methodInvokerMap;

    public JdkDynamicServerHandler(NettyRemotingServer nettyRemotingServer) {
        this.nettyRemotingServer = nettyRemotingServer;
        this.methodInvokerMap = new ConcurrentHashMap<String, ServerMethodInvoker>();
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        ctx.channel().close();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        this.processReceived(ctx.channel(), (Transporter)msg);
    }

    public void registerMethodInvoker(ServerMethodInvoker methodInvoker) {
        Preconditions.checkNotNull((Object)methodInvoker);
        Preconditions.checkNotNull((Object)methodInvoker.getMethodIdentify());
        this.methodInvokerMap.put(methodInvoker.getMethodIdentify(), methodInvoker);
    }

    private void processReceived(Channel channel, Transporter transporter) {
        String methodIdentifier = transporter.getHeader().getMethodIdentifier();
        if ("HEART_BEAT".equals(methodIdentifier)) {
            if (log.isDebugEnabled()) {
                log.debug("server receive heart beat from: host: {}", (Object)ChannelUtils.getRemoteAddress(channel));
            }
            return;
        }
        ServerMethodInvoker methodInvoker = this.methodInvokerMap.get(methodIdentifier);
        try {
            if (methodInvoker == null) {
                log.error("Cannot find the ServerMethodInvoker of : {}", (Object)transporter);
                StandardRpcResponse iRpcResponse = StandardRpcResponse.fail("Cannot find the ServerMethodInvoker of " + methodIdentifier);
                TransporterHeader transporterHeader = TransporterHeader.of(transporter.getHeader().getOpaque(), methodIdentifier);
                Transporter response = Transporter.of(transporterHeader, iRpcResponse);
                channel.writeAndFlush((Object)response);
                return;
            }
            this.nettyRemotingServer.getDefaultExecutor().execute(() -> {
                StandardRpcResponse iRpcResponse;
                try {
                    Object[] args;
                    StandardRpcRequest standardRpcRequest = JsonSerializer.deserialize(transporter.getBody(), StandardRpcRequest.class);
                    if (standardRpcRequest.getArgs() == null || standardRpcRequest.getArgs().length == 0) {
                        args = null;
                    } else {
                        args = new Object[standardRpcRequest.getArgs().length];
                        for (int i = 0; i < standardRpcRequest.getArgs().length; ++i) {
                            args[i] = JsonSerializer.deserialize(standardRpcRequest.getArgs()[i], standardRpcRequest.getArgsTypes()[i]);
                        }
                    }
                    Object result = methodInvoker.invoke(args);
                    iRpcResponse = result == null ? StandardRpcResponse.success(null, null) : StandardRpcResponse.success(JsonSerializer.serialize(result), result.getClass());
                }
                catch (Throwable e) {
                    log.error("Invoke method {} failed, {}.", new Object[]{methodIdentifier, e.getMessage(), e});
                    iRpcResponse = StandardRpcResponse.fail(e.getMessage());
                }
                TransporterHeader transporterHeader = TransporterHeader.of(transporter.getHeader().getOpaque(), methodIdentifier);
                Transporter response = Transporter.of(transporterHeader, iRpcResponse);
                channel.writeAndFlush((Object)response);
            });
        }
        catch (RejectedExecutionException e) {
            log.warn("NettyRemotingServer's thread pool is full, discard msg {} from {}", (Object)transporter, (Object)ChannelUtils.getRemoteAddress(channel));
            StandardRpcResponse iRpcResponse = StandardRpcResponse.fail("NettyRemotingServer's thread pool is full");
            TransporterHeader transporterHeader = TransporterHeader.of(transporter.getHeader().getOpaque(), methodIdentifier);
            Transporter response = Transporter.of(transporterHeader, iRpcResponse);
            channel.writeAndFlush((Object)response);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("exceptionCaught : {}", (Object)cause.getMessage(), (Object)cause);
        ctx.channel().close();
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        Channel ch = ctx.channel();
        ChannelConfig config = ch.config();
        if (!ch.isWritable()) {
            if (log.isWarnEnabled()) {
                log.warn("{} is not writable, over high water level : {}", (Object)ch, (Object)config.getWriteBufferHighWaterMark());
            }
            config.setAutoRead(false);
        } else {
            if (log.isWarnEnabled()) {
                log.warn("{} is writable, to low water : {}", (Object)ch, (Object)config.getWriteBufferLowWaterMark());
            }
            config.setAutoRead(true);
        }
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            ctx.channel().close();
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

