/*
 * Decompiled with CFR 0.152.
 */
package io.dingodb.net.netty.api;

import io.dingodb.common.Location;
import io.dingodb.common.codec.PrimitiveCodec;
import io.dingodb.common.codec.ProtostuffCodec;
import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.util.Parameters;
import io.dingodb.net.Channel;
import io.dingodb.net.Message;
import io.dingodb.net.MessageListener;
import io.dingodb.net.NetError;
import io.dingodb.net.api.ApiRegistry;
import io.dingodb.net.api.Ping;
import io.dingodb.net.api.annotation.ApiDeclaration;
import io.dingodb.net.error.ApiTerminateException;
import io.dingodb.net.netty.Constant;
import io.dingodb.net.netty.NetServiceConfiguration;
import io.dingodb.net.netty.api.ApiProxy;
import io.dingodb.net.netty.api.FixedChannelProxy;
import io.dingodb.net.netty.api.HandshakeApi;
import io.dingodb.net.netty.api.RandomChannelProxy;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApiRegistryImpl
implements ApiRegistry,
InvocationHandler {
    private static final Logger log = LoggerFactory.getLogger(ApiRegistryImpl.class);
    public static final ApiRegistryImpl INSTANCE = new ApiRegistryImpl();
    private final Map<String, Object> definedMap = new ConcurrentHashMap<String, Object>();
    private final Map<String, Method> declarationMap = new ConcurrentHashMap<String, Method>();

    private ApiRegistryImpl() {
        this.register(HandshakeApi.class, HandshakeApi.INSTANCE);
        this.register(Ping.class, Ping.INSTANCE);
    }

    public static ApiRegistryImpl instance() {
        return INSTANCE;
    }

    @Override
    public <T> void register(Class<T> api, T defined) {
        for (Method method : api.getMethods()) {
            ApiDeclaration declaration = method.getAnnotation(ApiDeclaration.class);
            if (declaration == null) continue;
            String name = declaration.name();
            if (name.isEmpty()) {
                name = method.toGenericString();
            }
            this.definedMap.put(name, defined);
            this.declarationMap.put(name, method);
            log.info("Register api: {}, method: {}, defined: {}", api.getName(), name, defined.getClass().getName());
        }
    }

    @Override
    public <T> void register(String name, Method method, T defined) {
        this.definedMap.put(name, defined);
        this.declarationMap.put(name, method);
        log.info("Register function: {}, defined: {}", (Object)name, (Object)defined.getClass().getName());
    }

    @Override
    public <T> T proxy(Class<T> api, Channel channel) {
        return this.proxy(api, channel, (int)NetServiceConfiguration.apiTimeout());
    }

    @Override
    public <T> T proxy(Class<T> api, Channel channel, T defined) {
        return this.proxy(api, new FixedChannelProxy<T>((io.dingodb.net.netty.channel.Channel)channel, defined, 0));
    }

    @Override
    public <T> T proxy(Class<T> api, Channel channel, int timeout) {
        return this.proxy(api, channel, null, timeout);
    }

    @Override
    public <T> T proxy(Class<T> api, Channel channel, T defined, int timeout) {
        return this.proxy(api, new FixedChannelProxy<T>((io.dingodb.net.netty.channel.Channel)channel, defined, timeout));
    }

    @Override
    public <T> T proxy(Class<T> api, Supplier<Location> locationSupplier) {
        return this.proxy(api, locationSupplier, null);
    }

    @Override
    public <T> T proxy(Class<T> api, Supplier<Location> locationSupplier, int timeout) {
        return this.proxy(api, locationSupplier, null, timeout);
    }

    @Override
    public <T> T proxy(Class<T> api, Supplier<Location> locationSupplier, T defined) {
        return this.proxy(api, locationSupplier, defined, 0);
    }

    @Override
    public <T> T proxy(Class<T> api, Supplier<Location> locationSupplier, T defined, int timeout) {
        return this.proxy(api, new RandomChannelProxy<T>(locationSupplier, defined, timeout));
    }

    private <T> T proxy(Class<T> api, ApiProxy apiProxy) {
        return (T)Proxy.newProxyInstance(api.getClassLoader(), new Class[]{api}, (InvocationHandler)apiProxy);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        return method.invoke(proxy, args);
    }

    public void invoke(io.dingodb.net.netty.channel.Channel channel, ByteBuffer buffer) {
        String name = PrimitiveCodec.readString(buffer);
        Method method = this.declarationMap.get(name);
        Message message = Constant.API_VOID;
        try {
            if (method == null) {
                NetError.API_NOT_FOUND.throwFormatError(name);
            }
            Object[] args = this.deserializeArgs(channel, buffer, method.getParameterTypes());
            Object result = this.invoke(this.definedMap.get(name), method, args);
            if (result instanceof CompletableFuture) {
                channel.setMessageListener(this.listenCancel(name, (CompletableFuture)result));
                Executors.execute("invoke-api", () -> this.invokeWithFuture(name, channel, (CompletableFuture)result));
                return;
            }
            if (result != null) {
                message = new Message("API_OK", ProtostuffCodec.write(result));
            }
        }
        catch (ApiTerminateException e) {
            log.error("Invoke [{}] from [{}/{}] is termination, message: {}.", name, channel.connection().remoteLocation(), channel.channelId(), e.getMessage(), e);
        }
        catch (InvocationTargetException e) {
            message = this.onError(Parameters.cleanNull(e.getCause(), () -> Parameters.cleanNull(e.getTargetException(), () -> e)), name, channel);
        }
        catch (Throwable e) {
            message = this.onError(e, name, channel);
        }
        channel.send(message);
    }

    private void invokeWithFuture(String name, io.dingodb.net.netty.channel.Channel channel, CompletableFuture<?> future) {
        try {
            channel.send(new Message("API_OK", ProtostuffCodec.write(future.join())));
        }
        catch (CancellationException e) {
            log.warn("Invoke [{}] from [{}/{}] is canceled.", name, channel.connection().remoteLocation(), channel.channelId());
        }
        catch (CompletionException e) {
            channel.send(this.onError(Parameters.cleanNull(e.getCause(), () -> e), name, channel));
        }
        catch (Throwable e) {
            channel.send(this.onError(e, name, channel));
        }
    }

    private Object[] deserializeArgs(io.dingodb.net.netty.channel.Channel channel, ByteBuffer buffer, Class<?>[] parameterTypes) {
        if (parameterTypes == null || parameterTypes.length == 0) {
            return Constant.API_EMPTY_ARGS;
        }
        Object[] args = (Object[])ProtostuffCodec.read(buffer);
        if (parameterTypes[0].isInstance(channel)) {
            args[0] = channel;
        }
        return args;
    }

    private MessageListener listenCancel(String name, CompletableFuture<?> future) {
        return (message, ch) -> {
            if (message.tag().equals("API_CANCEL")) {
                future.cancel(true);
            }
        };
    }

    private Message onError(Throwable error2, String name, io.dingodb.net.netty.channel.Channel channel) {
        log.error("Invoke [{}] from [{}/{}] error, message: {}.", name, channel.connection().remoteLocation(), channel.channelId(), error2.getMessage(), error2);
        return new Message("API_ERROR", ProtostuffCodec.write(error2));
    }
}

