/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.rpc;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.beans.ConstructorProperties;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.Codec;
import org.jetlinks.core.codec.Codecs;
import org.jetlinks.core.codec.Decoder;
import org.jetlinks.core.codec.defaults.DirectCodec;
import org.jetlinks.core.rpc.DisposableService;
import org.jetlinks.core.rpc.Invoker;
import org.jetlinks.core.rpc.RpcDefinition;
import org.jetlinks.core.rpc.RpcService;
import org.jetlinks.core.rpc.RpcServiceFactory;
import org.jetlinks.core.utils.BytesUtils;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class DefaultRpcServiceFactory
implements RpcServiceFactory {
    RpcService rpcService;
    static RpcRequestCodec requestCodec = new RpcRequestCodec();

    public <T> DisposableService<T> createProducer(String address, Class<T> serviceInterface) {
        if (!serviceInterface.isInterface()) {
            throw new UnsupportedOperationException("only support interface class");
        }
        HashMap defs = new HashMap();
        for (Method method2 : serviceInterface.getMethods()) {
            RpcDefinition<MethodRpcRequest, ?> def = this.createRpcDefinition(serviceInterface, method2);
            defs.put(method2, def);
        }
        RpcDefinition rpcDef = RpcDefinition.of((String)serviceInterface.getName(), (String)address, (Codec)DirectCodec.INSTANCE, (Codec)new Codec<Payload>(){

            public Class<Payload> forType() {
                return Payload.class;
            }

            public Payload decode(@Nonnull Payload payload) {
                payload.retain();
                return payload;
            }

            public Payload encode(Payload body) {
                return body;
            }
        });
        final Invoker invoker = this.rpcService.createInvoker(rpcDef);
        InvocationHandler handler = (proxy, method, args) -> {
            RpcDefinition definition = (RpcDefinition)defs.get(method);
            Flux flux = invoker.invoke((Object)definition.requestCodec().encode((Object)new MethodRpcRequest(definition.getAddress(), args))).flatMap(payload -> Mono.justOrEmpty((Object)payload.decode((Decoder)definition.responseCodec())));
            if (method.getReturnType().isAssignableFrom(Mono.class)) {
                return Mono.from((Publisher)flux);
            }
            return flux;
        };
        final Object service = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{serviceInterface}, handler);
        return new DisposableService<T>(){

            public T getService() {
                return service;
            }

            public void dispose() {
                invoker.dispose();
            }

            public boolean isDisposed() {
                return invoker.isDisposed();
            }
        };
    }

    public <T> Disposable createConsumer(String address, Class<T> serviceInterface, final T instance) {
        HashMap<String, 3> handlers = new HashMap<String, 3>();
        for (final Method declaredMethod : serviceInterface.getDeclaredMethods()) {
            final RpcDefinition<MethodRpcRequest, ?> definition = this.createRpcDefinition(instance.getClass(), declaredMethod);
            handlers.put(definition.getAddress(), new Function<Payload, Publisher<Payload>>(){

                @Override
                public Publisher<Payload> apply(Payload payload) {
                    MethodRpcRequest request = (MethodRpcRequest)payload.decode((Decoder)definition.requestCodec());
                    Object result = declaredMethod.invoke(instance, request.getArgs());
                    Codec codec = definition.responseCodec();
                    if (result instanceof Mono) {
                        return ((Mono)result).map(res -> codec.encode(res));
                    }
                    if (result instanceof Flux) {
                        return ((Flux)result).map(res -> codec.encode(res));
                    }
                    return Mono.just((Object)codec.encode(result));
                }
            });
        }
        return this.rpcService.listen(RpcDefinition.of((String)address, (String)address, (Codec)requestCodec, (Codec)DirectCodec.INSTANCE), (addr, method) -> (Publisher)((Function)handlers.get(method.getT1())).apply(method.getT2()));
    }

    private RpcDefinition<MethodRpcRequest, ?> createRpcDefinition(Class<?> type, Method method) {
        int count = method.getParameterCount();
        ArrayList<Codec<Object>> codecs = new ArrayList<Codec<Object>>();
        StringBuilder methodName = new StringBuilder(method.getName());
        for (int i = 0; i < count; ++i) {
            ResolvableType resolvableType = ResolvableType.forMethodParameter((Method)method, (int)i);
            Class paramType = resolvableType.resolve(Object.class);
            codecs.add((Codec<Object>)Codecs.lookup((ResolvableType)resolvableType));
            methodName.append(paramType.getSimpleName()).append(",");
        }
        MethodRpcRequestCodec codec = new MethodRpcRequestCodec(methodName.toString().getBytes(), codecs);
        ResolvableType returnType = ResolvableType.forMethodReturnType((Method)method, type);
        if (!Publisher.class.isAssignableFrom(returnType.toClass())) {
            throw new UnsupportedOperationException("unsupported return type:" + returnType);
        }
        return RpcDefinition.of((String)type.getName(), (String)methodName.toString(), (Codec)codec, (Codec)Codecs.lookup((ResolvableType)returnType));
    }

    @ConstructorProperties(value={"rpcService"})
    public DefaultRpcServiceFactory(RpcService rpcService) {
        this.rpcService = rpcService;
    }

    static final class MethodRpcRequest {
        private final String method;
        private final Object[] args;

        @ConstructorProperties(value={"method", "args"})
        public MethodRpcRequest(String method, Object[] args) {
            this.method = method;
            this.args = args;
        }

        public String getMethod() {
            return this.method;
        }

        public Object[] getArgs() {
            return this.args;
        }
    }

    static class MethodRpcRequestCodec
    implements Codec<MethodRpcRequest> {
        private final byte[] method;
        private final List<Codec<Object>> parameter;

        public Class<MethodRpcRequest> forType() {
            return MethodRpcRequest.class;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nullable
        public MethodRpcRequest decode(@Nonnull Payload payload) {
            try {
                ByteBuf buf = payload.getBody().skipBytes(4 + this.method.length);
                Object[] args = new Object[this.parameter.size()];
                int i = 0;
                for (Codec<Object> codec : this.parameter) {
                    byte[] lenArr = new byte[4];
                    buf.readBytes(lenArr);
                    int len = BytesUtils.beToInt((byte[])lenArr);
                    ByteBuf data = buf.readBytes(len);
                    args[i++] = Payload.of((ByteBuf)data).decode(codec);
                }
                buf.resetReaderIndex();
                MethodRpcRequest methodRpcRequest = new MethodRpcRequest(new String(this.method), args);
                return methodRpcRequest;
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)payload);
            }
        }

        public Payload encode(MethodRpcRequest body) {
            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4 + this.method.length);
            buf.writeBytes(BytesUtils.intToBe((int)this.method.length));
            buf.writeBytes(this.method);
            Object[] params = body.getArgs();
            if (params != null) {
                int size = params.length;
                for (int i = 0; i < size; ++i) {
                    ByteBuf data = this.parameter.get(i).encode(params[i]).getBody();
                    buf.writeBytes(BytesUtils.intToBe((int)data.capacity()));
                    buf.writeBytes(data);
                }
            }
            return Payload.of((ByteBuf)buf);
        }

        @ConstructorProperties(value={"method", "parameter"})
        public MethodRpcRequestCodec(byte[] method, List<Codec<Object>> parameter) {
            this.method = method;
            this.parameter = parameter;
        }
    }

    static class RpcRequestCodec
    implements Codec<Tuple2<String, Payload>> {
        RpcRequestCodec() {
        }

        public Class<Tuple2<String, Payload>> forType() {
            return Tuple2.class;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nullable
        public Tuple2<String, Payload> decode(@Nonnull Payload payload) {
            ByteBuf buf = payload.getBody();
            byte[] lenArr = new byte[4];
            buf.readBytes(lenArr);
            int len = BytesUtils.beToInt((byte[])lenArr);
            ByteBuf byteBuf = buf.readBytes(len);
            buf.resetReaderIndex();
            try {
                Tuple2 tuple2 = Tuples.of((Object)byteBuf.toString(StandardCharsets.UTF_8), (Object)payload);
                return tuple2;
            }
            finally {
                ReferenceCountUtil.safeRelease((Object)byteBuf);
            }
        }

        public Payload encode(Tuple2<String, Payload> tuple2) {
            byte[] bytes = ((String)tuple2.getT1()).getBytes();
            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4 + bytes.length);
            buf.writeBytes(BytesUtils.intToBe((int)bytes.length));
            buf.writeBytes(bytes);
            return Payload.of((ByteBuf)buf);
        }
    }
}

