/*
 * Decompiled with CFR 0.152.
 */
package as.leap.vertx.rpc.impl;

import as.leap.vertx.rpc.RPCClient;
import as.leap.vertx.rpc.RequestProp;
import as.leap.vertx.rpc.VertxRPCException;
import as.leap.vertx.rpc.impl.RPCBase;
import as.leap.vertx.rpc.impl.RPCClientOptions;
import as.leap.vertx.rpc.impl.RPCRequest;
import as.leap.vertx.rpc.impl.RPCResponse;
import as.leap.vertx.rpc.impl.WrapperType;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;

public class VertxRPCClient<T>
extends RPCBase
implements InvocationHandler,
RPCClient<T> {
    private Class<T> service;
    private Vertx vertx;
    private RPCClientOptions options;
    private String serviceAddress;
    private long timeout;

    public VertxRPCClient(RPCClientOptions<T> options) {
        super(options.getWireProtocol());
        this.options = options;
        this.vertx = options.getVertx();
        this.timeout = options.getTimeout();
        this.serviceAddress = options.getBusAddress();
        this.service = options.getServiceClass();
        this.checkBusAddress(this.serviceAddress);
        Objects.requireNonNull(this.service, "service's interface can not be null.");
    }

    @Override
    public T bindService() {
        return (T)Proxy.newProxyInstance(this.service.getClassLoader(), new Class[]{this.service}, (InvocationHandler)this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String serviceName = this.service.getCanonicalName();
        final RPCRequest request = new RPCRequest();
        request.setServiceName(serviceName);
        request.setMethodName(method.getName());
        List argsClass = Stream.of(method.getParameterTypes()).filter(argClass -> !argClass.isAssignableFrom(Handler.class)).collect(Collectors.toList());
        List argsClassName = argsClass.stream().map(clazz -> {
            if (this.isWrapType((Class)clazz)) {
                return WrapperType.class.getName();
            }
            return clazz.getName();
        }).collect(Collectors.toList());
        ArrayList<Object> argList = new ArrayList<Object>();
        for (int index = 0; index < argsClass.size(); ++index) {
            byte[] argBytes;
            Optional<Object> argOptional = Optional.ofNullable(args[index]);
            if (argOptional.isPresent()) {
                argList.add(argsClassName.get(index));
                Class argClass2 = (Class)argsClass.get(index);
                argBytes = this.asBytes(argOptional.get(), argClass2);
            } else {
                argList.add(WrapperType.class.getName());
                argBytes = this.asBytes(new WrapperType<Object>(null, (Class)argsClass.get(index)), WrapperType.class);
            }
            argList.add(argBytes);
        }
        request.setArgs(argList);
        final RPCBase.CallbackType callbackType = this.getCallbackType(method.getReturnType());
        final RequestProperties requestProperties = this.extractRequestProp(method);
        switch (callbackType) {
            case REACTIVE: {
                return Observable.create((Observable.OnSubscribe)new ReactiveHandler<Object>(){

                    @Override
                    void execute() throws Exception {
                        VertxRPCClient.this.invoke(request, requestProperties, callbackType, this);
                    }
                });
            }
            case ASYNC_HANDLER: {
                Handler handler = (Handler)args[args.length - 1];
                this.invoke(request, requestProperties, callbackType, handler);
                return null;
            }
            case COMPLETABLE_FUTURE: {
                CompletableFutureHandler futureHandler = new CompletableFutureHandler();
                this.invoke(request, requestProperties, callbackType, futureHandler);
                return futureHandler.future;
            }
        }
        throw new VertxRPCException("unKnow the type of callback");
    }

    private RPCBase.CallbackType getCallbackType(Class<?> returnType) {
        if (Observable.class.isAssignableFrom(returnType)) {
            return RPCBase.CallbackType.REACTIVE;
        }
        if (CompletableFuture.class.isAssignableFrom(returnType)) {
            return RPCBase.CallbackType.COMPLETABLE_FUTURE;
        }
        if (Void.TYPE.equals(returnType)) {
            return RPCBase.CallbackType.ASYNC_HANDLER;
        }
        throw new VertxRPCException("unKnow the type of callback, for now, we just support Obserable CompletableFuture and Handler of vert.x");
    }

    private <E> void invoke(RPCRequest request, RequestProperties requestProp, RPCBase.CallbackType callBackType, Handler<AsyncResult<E>> responseHandler) throws Exception {
        DeliveryOptions deliveryOptions = new DeliveryOptions();
        deliveryOptions.setSendTimeout(requestProp.getTimeout());
        deliveryOptions.addHeader("callbackType", callBackType.name());
        byte[] requestBytes = this.asBytes(request);
        ReplyHandler<E> replyHandler = new ReplyHandler<E>(requestProp.getRetryTimes(), 0, requestBytes, deliveryOptions, responseHandler);
        this.vertx.eventBus().send(this.serviceAddress, (Object)requestBytes, deliveryOptions, replyHandler);
    }

    private <EX extends Exception> EX getThrowable(String messExceptionString) {
        String[] messages = messExceptionString.split("\\|", 2);
        String exceptionClass = messages[0];
        String message = messages[1];
        try {
            Class<?> clazz = Class.forName(exceptionClass);
            return (EX)((Exception)clazz.getConstructor(String.class).newInstance(message));
        }
        catch (ClassNotFoundException | NoSuchMethodException e) {
            return (EX)new VertxRPCException("Invoke method " + message + " throw exception " + exceptionClass);
        }
        catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
            throw new VertxRPCException(e);
        }
    }

    private RequestProperties extractRequestProp(Method method) {
        return Optional.ofNullable(method.getAnnotation(RequestProp.class)).map(requestProp -> {
            RequestProperties requestProperties = new RequestProperties();
            requestProperties.setTimeout(requestProp.timeout() == 0 ? this.timeout : requestProp.timeUnit().toMillis(requestProp.timeout()));
            requestProperties.setRetryTimes(requestProp.retry());
            return requestProperties;
        }).orElse(new RequestProperties(this.timeout));
    }

    private static class RequestProperties {
        private long timeout;
        private int retryTimes = 0;

        public RequestProperties() {
        }

        public RequestProperties(long timeout) {
            this.timeout = timeout;
        }

        public void setTimeout(long timeout) {
            this.timeout = timeout;
        }

        public void setRetryTimes(int retryTimes) {
            this.retryTimes = retryTimes;
        }

        public long getTimeout() {
            return this.timeout;
        }

        public int getRetryTimes() {
            return this.retryTimes;
        }
    }

    private class ReplyHandler<E>
    implements Handler<AsyncResult<Message<byte[]>>> {
        private int retryTimes;
        private int currentRetryTimes;
        private byte[] requestBytes;
        private DeliveryOptions deliveryOptions;
        private Handler<AsyncResult<E>> responseHandler;

        public ReplyHandler(int retryTimes, int currentRetryTimes, byte[] requestBytes, DeliveryOptions deliveryOptions, Handler<AsyncResult<E>> responseHandler) {
            this.retryTimes = retryTimes;
            this.currentRetryTimes = currentRetryTimes;
            this.requestBytes = requestBytes;
            this.deliveryOptions = deliveryOptions;
            this.responseHandler = responseHandler;
        }

        public void handle(AsyncResult<Message<byte[]>> message) {
            if (message.succeeded()) {
                try {
                    RPCResponse response = VertxRPCClient.this.asObject((byte[])((Message)message.result()).body(), RPCResponse.class);
                    String responseTypeName = response.getResponseTypeName();
                    byte[] responseBytes = response.getResponse();
                    Object result = VertxRPCClient.this.asObject(responseBytes, Class.forName(responseTypeName));
                    Object realResult = result instanceof WrapperType ? ((WrapperType)result).getValue() : result;
                    this.responseHandler.handle((Object)Future.succeededFuture(realResult));
                }
                catch (Exception e) {
                    this.responseHandler.handle((Object)Future.failedFuture((Throwable)new VertxRPCException(e)));
                }
            } else {
                Throwable throwable = message.cause();
                if (throwable instanceof ReplyException && ((ReplyException)throwable).failureType() == ReplyFailure.TIMEOUT && this.currentRetryTimes < this.retryTimes) {
                    ++this.currentRetryTimes;
                    VertxRPCClient.this.vertx.eventBus().send(VertxRPCClient.this.serviceAddress, (Object)this.requestBytes, this.deliveryOptions, (Handler)this);
                } else if (throwable instanceof ReplyException && ((ReplyException)throwable).failureType() == ReplyFailure.RECIPIENT_FAILURE) {
                    Exception t = VertxRPCClient.this.getThrowable(throwable.getMessage());
                    this.responseHandler.handle((Object)Future.failedFuture((Throwable)t));
                } else {
                    this.responseHandler.handle((Object)Future.failedFuture((Throwable)throwable));
                }
            }
        }
    }

    private static class CompletableFutureHandler<T>
    implements Handler<AsyncResult<T>> {
        private CompletableFuture<T> future = new CompletableFuture();

        private CompletableFutureHandler() {
        }

        public void handle(AsyncResult<T> event) {
            if (event.succeeded()) {
                this.future.complete(event.result());
            } else {
                this.future.completeExceptionally(event.cause());
            }
        }
    }

    private static abstract class ReactiveHandler<T>
    implements Observable.OnSubscribe<T>,
    Handler<AsyncResult<T>> {
        private Observer<? super T> observer;

        private ReactiveHandler() {
        }

        public void handle(AsyncResult<T> event) {
            if (event.succeeded()) {
                this.fireNext(event.result());
            } else {
                this.fireError(event.cause());
            }
        }

        protected void fireNext(T next) {
            if (this.observer != null) {
                this.observer.onNext(next);
            }
        }

        protected void fireError(Throwable t) {
            if (this.observer != null) {
                this.observer.onError(t);
            }
        }

        public void call(Subscriber<? super T> subscriber) {
            this.observer = subscriber;
            try {
                this.execute();
            }
            catch (Exception e) {
                this.fireError(e);
            }
        }

        abstract void execute() throws Exception;
    }
}

