/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.scalecube.rpc;

import io.netty.buffer.ByteBuf;
import io.netty.util.internal.ThreadLocalRandom;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import io.scalecube.services.Reflect;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.ServiceInfo;
import io.scalecube.services.ServiceMethodDefinition;
import io.scalecube.services.ServiceReference;
import io.scalecube.services.ServiceRegistration;
import io.scalecube.services.ServiceScanner;
import io.scalecube.services.api.Qualifier;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.methods.MethodInfo;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.ServerTransport;
import io.scalecube.services.transport.api.ServiceMessageDataDecoder;
import io.scalecube.services.transport.api.ServiceTransport;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jctools.maps.NonBlockingHashMap;
import org.jctools.maps.NonBlockingHashSet;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.RpcService;
import org.jetlinks.core.rpc.ServiceEvent;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.jetlinks.supports.scalecube.rpc.NoneServiceRegistry;
import org.jetlinks.supports.scalecube.rpc.RpcServiceMethodRegistry;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class ScalecubeRpcManager
implements RpcManager {
    private static final Logger log = LoggerFactory.getLogger(ScalecubeRpcManager.class);
    private final String id = UUID.randomUUID().toString();
    private static final String SPREAD_ENDPOINT_QUALIFIER = "rpc_edp";
    private static final String SPREAD_FROM_HEADER = "rpc_edp_f";
    static final String DEFAULT_SERVICE_ID = "_default";
    static final String SERVICE_ID_TAG = "_sid";
    static final String SERVICE_NAME_TAG = "_sname";
    static final String REGISTER_TIME_TAG = "_regtime";
    private ExtendedCluster cluster;
    private ServiceCall serviceCall;
    private Supplier<ServiceTransport> transportSupplier;
    private final Map<String, ClusterNode> serverServiceRef = new NonBlockingHashMap();
    private final Map<String, Sinks.Many<ServiceEvent>> listener = new NonBlockingHashMap();
    private final List<ServiceRegistration> localRegistrations = new CopyOnWriteArrayList<ServiceRegistration>();
    private final ServiceMethodRegistry methodRegistry = new RpcServiceMethodRegistry();
    private ServiceTransport transport;
    private ServerTransport serverTransport;
    private String externalHost;
    private Integer externalPort;
    private String contentType = "application/json";
    private Disposable syncJob = Disposables.disposed();
    private final Disposable.Composite disposable = Disposables.composite();

    public ScalecubeRpcManager() {
        this(null, null);
    }

    public ScalecubeRpcManager(ExtendedCluster cluster, Supplier<ServiceTransport> transport) {
        this.cluster = cluster;
        this.transportSupplier = transport;
    }

    public ScalecubeRpcManager(ScalecubeRpcManager another) {
        this.cluster = another.cluster;
        this.transportSupplier = another.transportSupplier;
        this.externalHost = another.externalHost;
        this.externalPort = another.externalPort;
        this.contentType = another.contentType;
    }

    public String currentServerId() {
        String alias = this.cluster.member().alias();
        String id = this.cluster.member().id();
        return alias == null ? id : alias;
    }

    public ScalecubeRpcManager externalHost(String host) {
        ScalecubeRpcManager m = new ScalecubeRpcManager(this);
        m.externalHost = host;
        return m;
    }

    public ScalecubeRpcManager externalPort(Integer port) {
        ScalecubeRpcManager m = new ScalecubeRpcManager(this);
        m.externalPort = port;
        return m;
    }

    public ScalecubeRpcManager transport(Supplier<ServiceTransport> transportSupplier) {
        ScalecubeRpcManager m = new ScalecubeRpcManager(this);
        m.transportSupplier = transportSupplier;
        return m;
    }

    public ScalecubeRpcManager cluster(ExtendedCluster cluster) {
        ScalecubeRpcManager m = new ScalecubeRpcManager(this);
        m.cluster = cluster;
        return m;
    }

    public ScalecubeRpcManager contentType(String contentType) {
        ScalecubeRpcManager m = new ScalecubeRpcManager(this);
        m.contentType = contentType;
        return m;
    }

    public void startAwait() {
        this.startAsync().block();
    }

    public Mono<Void> startAsync() {
        Objects.requireNonNull(this.transportSupplier);
        Objects.requireNonNull(this.cluster);
        this.cluster.handler(ignore -> new ClusterMessageHandler(){

            public void onMessage(Message message) {
                String from = message.header(ScalecubeRpcManager.SPREAD_FROM_HEADER);
                if (StringUtils.hasText((String)from) && ScalecubeRpcManager.SPREAD_ENDPOINT_QUALIFIER.equals(message.qualifier())) {
                    ScalecubeRpcManager.this.cluster.member(from).ifPresent(member -> ScalecubeRpcManager.this.handleServiceEndpoint(member, (ServiceEndpoint)message.data()));
                }
            }

            public void onGossip(Message gossip) {
                this.onMessage(gossip);
            }

            public void onMembershipEvent(MembershipEvent event) {
                if (event.isLeaving() || event.isRemoved()) {
                    ScalecubeRpcManager.this.memberLeave(event.member());
                }
                if (event.isAdded() || event.isUpdated()) {
                    ScalecubeRpcManager.this.syncRegistration(event.member());
                }
            }
        });
        return this.initTransport(this.transportSupplier.get()).start().doOnNext(trans -> {
            this.transport = trans;
        }).flatMap(trans -> trans.serverTransport(this.methodRegistry).bind()).doOnNext(trans -> {
            this.serverTransport = trans;
        }).then(Mono.fromRunnable(this::start0));
    }

    private ServiceTransport initTransport(ServiceTransport transport) {
        return transport;
    }

    private void start0() {
        this.serviceCall = new ServiceCall().transport(this.transport.clientTransport());
        this.syncRegistration();
        this.disposable.add(Flux.interval((Duration)Duration.ofSeconds(60L)).concatMap(ignore -> this.doSyncRegistration().onErrorResume(err -> Mono.empty())).subscribe());
    }

    public void stopAwait() {
        this.stopAsync().block();
    }

    public Mono<Void> stopAsync() {
        if (this.serverTransport == null || this.transport == null) {
            return Mono.empty();
        }
        this.localRegistrations.clear();
        this.disposable.dispose();
        return Flux.concatDelayError((Publisher[])new Publisher[]{this.doSyncRegistration().onErrorResume(err -> Mono.empty()), this.serverTransport.stop(), this.transport.stop()}).doOnComplete(() -> {
            this.serverTransport = null;
            this.transport = null;
        }).then();
    }

    private Address resolveAddress() {
        if (StringUtils.hasText((String)this.externalHost)) {
            if (this.externalPort != null) {
                return Address.create((String)this.externalHost, (int)this.externalPort);
            }
            return Address.create((String)this.externalHost, (int)this.serverTransport.address().port());
        }
        return ScalecubeRpcManager.prepareAddress(this.serverTransport.address());
    }

    private static Address prepareAddress(Address address) {
        InetAddress inetAddress;
        try {
            inetAddress = InetAddress.getByName(address.host());
        }
        catch (UnknownHostException e) {
            throw Exceptions.propagate((Throwable)e);
        }
        if (inetAddress.isAnyLocalAddress()) {
            return Address.create((String)Address.getLocalIpAddress().getHostAddress(), (int)address.port());
        }
        return Address.create((String)inetAddress.getHostAddress(), (int)address.port());
    }

    private ServiceEndpoint createEndpoint() {
        return ServiceEndpoint.builder().id(this.id).address(this.resolveAddress()).contentTypes(DataCodec.getAllContentTypes()).serviceRegistrations(this.localRegistrations).build();
    }

    private synchronized void syncRegistration(Member member) {
        this.cluster.send(member, Message.withData((Object)this.createEndpoint()).header(SPREAD_FROM_HEADER, this.cluster.member().id()).qualifier(SPREAD_ENDPOINT_QUALIFIER).build()).subscribe();
    }

    private Mono<Void> doSyncRegistration() {
        ServiceEndpoint endpoint = this.createEndpoint();
        log.debug("Synchronization registration : {}", (Object)endpoint);
        return this.cluster.spreadGossip(Message.withData((Object)endpoint).header(SPREAD_FROM_HEADER, this.cluster.member().id()).qualifier(SPREAD_ENDPOINT_QUALIFIER).build()).then();
    }

    private synchronized void syncRegistration() {
        if (this.cluster == null) {
            return;
        }
        if (!this.syncJob.isDisposed()) {
            this.syncJob.dispose();
        }
        this.syncJob = Mono.delay((Duration)Duration.ofMillis(200L)).flatMap(ignore -> this.doSyncRegistration()).subscribe();
    }

    public <T> Disposable registerService(String service, T rpcService) {
        ServiceInfo serviceInfo = ServiceInfo.fromServiceInstance(rpcService).errorMapper((ServiceProviderErrorMapper)DefaultErrorMapper.INSTANCE).dataDecoder((msg, type) -> {
            if (type.isAssignableFrom(ByteBuf.class) && msg.hasData(ByteBuf.class)) {
                return ServiceMessage.from((ServiceMessage)msg).data(msg.data()).build();
            }
            return (ServiceMessage)ServiceMessageDataDecoder.INSTANCE.apply(msg, type);
        }).tag(SERVICE_ID_TAG, service).build();
        this.methodRegistry.registerService(serviceInfo);
        List registrations = ServiceScanner.scanServiceInfo((ServiceInfo)serviceInfo).stream().map(ref -> {
            HashMap<String, String> tags = new HashMap<String, String>(ref.tags());
            tags.put(SERVICE_ID_TAG, service);
            tags.put(SERVICE_NAME_TAG, ref.namespace());
            tags.put(REGISTER_TIME_TAG, String.valueOf(System.currentTimeMillis()));
            return new ServiceRegistration(ScalecubeRpcManager.createMethodQualifier(service, ref.namespace()), tags, ref.methods());
        }).collect(Collectors.toList());
        this.localRegistrations.addAll(registrations);
        this.syncRegistration();
        log.debug("register rpc service {}", (Object)serviceInfo);
        return () -> {
            this.localRegistrations.removeAll(registrations);
            this.syncRegistration();
        };
    }

    public <T> Disposable registerService(T rpcService) {
        return this.registerService(DEFAULT_SERVICE_ID, rpcService);
    }

    public <I> Flux<RpcService<I>> getServices(Class<I> service) {
        return Flux.fromIterable(this.serverServiceRef.entrySet()).flatMapIterable(e -> ((ClusterNode)e.getValue()).getApiCalls(service));
    }

    public <I> Mono<RpcService<I>> selectService(Class<I> service) {
        ArrayList calls = new ArrayList(this.serverServiceRef.size());
        for (Map.Entry<String, ClusterNode> entry : this.serverServiceRef.entrySet()) {
            calls.addAll(entry.getValue().getApiCalls(service));
        }
        if (calls.isEmpty()) {
            return Mono.empty();
        }
        if (calls.size() == 1) {
            return Mono.just(calls.get(0));
        }
        return Mono.just(calls.get(ThreadLocalRandom.current().nextInt(calls.size())));
    }

    public <I> Flux<RpcService<I>> getServices(String id, Class<I> service) {
        return Flux.fromIterable(this.serverServiceRef.entrySet()).flatMapIterable(e -> ((ClusterNode)e.getValue()).getApiCalls(id, service));
    }

    public <I> Mono<I> getService(String serverNodeId, Class<I> service) {
        return this.getService(serverNodeId, DEFAULT_SERVICE_ID, service);
    }

    public <I> Mono<I> getService(String serverNodeId, String serviceId, Class<I> service) {
        return Mono.fromSupplier(() -> {
            ClusterNode node = this.serverServiceRef.get(serverNodeId);
            if (node == null) {
                return null;
            }
            return node.getApiCall(serviceId, service).service();
        });
    }

    public <I> Flux<ServiceEvent> listen(Class<I> service) {
        String name = Reflect.serviceName(service);
        return this.listener.computeIfAbsent(name, ignore -> Sinks.many().multicast().onBackpressureBuffer()).asFlux();
    }

    private void memberLeave(Member member) {
        String id = member.alias() == null ? member.id() : member.alias();
        ClusterNode ref = this.serverServiceRef.remove(id);
        if (null != ref) {
            this.fireEvent(ref.services, id, ServiceEvent.Type.removed);
        }
        log.debug("remove service endpoint [{}] ", (Object)member);
    }

    private void fireEvent(Collection<ServiceRegistration> services, String memberId, ServiceEvent.Type type) {
        for (ServiceRegistration service : services) {
            String serviceName = service.tags().getOrDefault(SERVICE_NAME_TAG, service.namespace());
            Sinks.Many<ServiceEvent> sink = this.listener.get(serviceName);
            if (sink == null || sink.currentSubscriberCount() <= 0) continue;
            String id = service.tags().getOrDefault(SERVICE_ID_TAG, DEFAULT_SERVICE_ID);
            sink.emitNext((Object)new ServiceEvent(id, serviceName, memberId, type), Reactors.emitFailureHandler());
        }
    }

    private void handleServiceEndpoint(Member member, ServiceEndpoint endpoint) {
        if (this.cluster.member().id().equals(member.id())) {
            return;
        }
        String id = member.alias() == null ? member.id() : member.alias();
        ClusterNode references = this.serverServiceRef.computeIfAbsent(id, ignore -> new ClusterNode());
        references.id = id;
        references.member = member;
        references.rpcAddress = endpoint.address();
        references.register(endpoint);
    }

    static String createMethodQualifier(String serviceId, String qualifier) {
        return Qualifier.asString((String)serviceId, (String)qualifier);
    }

    static class ServiceReferenceInfo {
        private String id;
        private ServiceReference reference;

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ServiceReferenceInfo)) {
                return false;
            }
            ServiceReferenceInfo other = (ServiceReferenceInfo)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$id = this.id;
            String other$id = other.id;
            return !(this$id == null ? other$id != null : !this$id.equals(other$id));
        }

        protected boolean canEqual(Object other) {
            return other instanceof ServiceReferenceInfo;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $id = this.id;
            result = result * 59 + ($id == null ? 43 : $id.hashCode());
            return result;
        }

        public ServiceReferenceInfo(String id, ServiceReference reference) {
            this.id = id;
            this.reference = reference;
        }
    }

    static class RpcServiceCall<T>
    implements RpcService<T> {
        private final String serverNodeId;
        private final String id;
        private final String name;
        private final T service;

        public String serverNodeId() {
            return this.serverNodeId;
        }

        public String id() {
            return this.id;
        }

        public String name() {
            return this.name;
        }

        public T service() {
            return this.service;
        }

        public <R> R cast(Class<R> type) {
            return type.cast(this.service);
        }

        public RpcServiceCall(String serverNodeId, String id, String name, T service) {
            this.serverNodeId = serverNodeId;
            this.id = id;
            this.name = name;
            this.service = service;
        }
    }

    class ClusterNode {
        private String id;
        private Member member;
        private Address rpcAddress;
        private final Map<String, Set<ServiceReferenceInfo>> serviceReferencesByQualifier = new NonBlockingHashMap();
        private final List<ServiceRegistration> services = new CopyOnWriteArrayList<ServiceRegistration>();
        private final Map<Class<?>, Map<String, RpcServiceCall<?>>> serviceInstances = new NonBlockingHashMap();

        ClusterNode() {
        }

        public void register(ServiceEndpoint endpoint) {
            ArrayList<String> readyToRemove = new ArrayList<String>(this.serviceReferencesByQualifier.keySet());
            TreeSet<ServiceRegistration> added = new TreeSet<ServiceRegistration>(Comparator.comparing(ServiceRegistration::namespace));
            TreeSet<ServiceRegistration> removed = new TreeSet<ServiceRegistration>(Comparator.comparing(ServiceRegistration::namespace));
            removed.addAll(this.services);
            log.debug("update service endpoint from [{}] : {} ", (Object)this.member, (Object)endpoint);
            for (ServiceRegistration registration : endpoint.serviceRegistrations()) {
                if (!removed.remove(registration)) {
                    added.add(registration);
                }
                for (ServiceMethodDefinition method : registration.methods()) {
                    ServiceReference ref = new ServiceReference(method, registration, endpoint);
                    readyToRemove.remove(ref.qualifier());
                    readyToRemove.remove(ref.oldQualifier());
                    this.populateServiceReferences(ref.qualifier(), ref);
                    this.populateServiceReferences(ref.oldQualifier(), ref);
                }
            }
            for (String qualifier : readyToRemove) {
                this.serviceReferencesByQualifier.remove(qualifier);
            }
            removed.forEach(this.services::remove);
            this.services.addAll(added);
            ScalecubeRpcManager.this.fireEvent(added, this.id, ServiceEvent.Type.added);
            ScalecubeRpcManager.this.fireEvent(removed, this.id, ServiceEvent.Type.removed);
        }

        private boolean populateServiceReferences(String qualifier, ServiceReference serviceReference) {
            String id = serviceReference.tags().getOrDefault(ScalecubeRpcManager.SERVICE_ID_TAG, ScalecubeRpcManager.DEFAULT_SERVICE_ID);
            return this.serviceReferencesByQualifier.computeIfAbsent(qualifier, key -> new NonBlockingHashSet()).add(new ServiceReferenceInfo(id, serviceReference));
        }

        private <I> RpcServiceCall<I> createApiCall(String serviceId, Class<I> clazz) {
            String name = Reflect.serviceName(clazz);
            ServiceCall call = ScalecubeRpcManager.this.serviceCall.router((serviceRegistry, request) -> {
                Set<ServiceReferenceInfo> refs = this.serviceReferencesByQualifier.get(request.qualifier());
                if (refs == null) {
                    return Optional.empty();
                }
                Iterator<ServiceReferenceInfo> iterator = refs.iterator();
                if (iterator.hasNext()) {
                    ServiceReferenceInfo ref = iterator.next();
                    return Optional.of(ref.reference);
                }
                return Optional.empty();
            }).serviceRegistry(NoneServiceRegistry.INSTANCE);
            return new RpcServiceCall<I>(this.id, serviceId, name, this.api(call, serviceId, clazz));
        }

        private <I> List<RpcServiceCall<I>> getApiCalls(Class<I> clazz) {
            return this.getApiCalls(null, clazz);
        }

        private <I> List<RpcServiceCall<I>> getApiCalls(String id, Class<I> clazz) {
            String sName = Reflect.serviceName(clazz);
            ArrayList<RpcServiceCall<I>> registrations = new ArrayList<RpcServiceCall<I>>(1);
            for (ServiceRegistration service : this.services) {
                String name = service.tags().getOrDefault(ScalecubeRpcManager.SERVICE_NAME_TAG, service.namespace());
                String sid = service.tags().getOrDefault(ScalecubeRpcManager.SERVICE_ID_TAG, ScalecubeRpcManager.DEFAULT_SERVICE_ID);
                if (!Objects.equals(name, sName) || id != null && !Objects.equals(sid, id)) continue;
                registrations.add(this.getApiCall(sid, clazz, service));
            }
            return registrations;
        }

        private <I> RpcServiceCall<I> getApiCall(String id, Class<I> clazz, ServiceRegistration registration) {
            return this.serviceInstances.computeIfAbsent(clazz, type -> new NonBlockingHashMap()).computeIfAbsent(id, _id -> this.createApiCall((String)_id, clazz));
        }

        private <I> RpcServiceCall<I> getApiCall(String id, Class<I> clazz) {
            return this.getApiCall(id, clazz, null);
        }

        private Mono<ServiceMessage> toServiceMessage(MethodInfo methodInfo, Object request) {
            ServiceMessage.Builder builder = request instanceof ServiceMessage ? ServiceMessage.from((ServiceMessage)((ServiceMessage)request)).qualifier(methodInfo.qualifier()) : ServiceMessage.builder().qualifier(methodInfo.qualifier()).data(request).dataFormatIfAbsent(ScalecubeRpcManager.this.contentType);
            return TraceHolder.writeContextTo((Object)builder, ServiceMessage.Builder::header).map(ServiceMessage.Builder::build);
        }

        private <T> T api(final ServiceCall serviceCall, String id, final Class<T> serviceInterface) {
            final HashMap genericReturnTypes = new HashMap(Reflect.methodsInfo(serviceInterface));
            for (Map.Entry entry : genericReturnTypes.entrySet()) {
                MethodInfo old = (MethodInfo)entry.getValue();
                entry.setValue(new MethodInfo(Qualifier.asString((String)id, (String)old.serviceName()), old.methodName(), old.parameterizedReturnType(), old.isReturnTypeServiceMessage(), old.communicationMode(), old.parameterCount(), old.requestType(), old.isRequestTypeServiceMessage(), old.isSecured()));
            }
            return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler(){

                @Override
                public Object invoke(Object proxy, Method method, Object[] params) {
                    Optional check = ClusterNode.this.toStringOrEqualsOrHashCode(method.getName(), serviceInterface, params);
                    if (check.isPresent()) {
                        return check.get();
                    }
                    MethodInfo methodInfo = (MethodInfo)genericReturnTypes.get(method);
                    Type returnType = methodInfo.parameterizedReturnType();
                    boolean isServiceMessage = methodInfo.isReturnTypeServiceMessage();
                    Object request = methodInfo.requestType() == Void.TYPE ? null : params[0];
                    switch (methodInfo.communicationMode()) {
                        case FIRE_AND_FORGET: {
                            return ClusterNode.this.toServiceMessage(methodInfo, request).flatMap(arg_0 -> ((ServiceCall)serviceCall).oneWay(arg_0));
                        }
                        case REQUEST_RESPONSE: {
                            return ClusterNode.this.toServiceMessage(methodInfo, request).flatMap(msg -> serviceCall.requestOne(msg, returnType)).transform(ClusterNode.this.asMono(isServiceMessage));
                        }
                        case REQUEST_STREAM: {
                            return ClusterNode.this.toServiceMessage(methodInfo, request).flatMapMany(msg -> serviceCall.requestMany(msg, returnType)).transform(ClusterNode.this.asFlux(isServiceMessage));
                        }
                        case REQUEST_CHANNEL: {
                            return serviceCall.requestBidirectional((Publisher)Flux.from((Publisher)((Publisher)request)).flatMap(data -> ClusterNode.this.toServiceMessage(methodInfo, data)), returnType).transform(ClusterNode.this.asFlux(isServiceMessage));
                        }
                    }
                    throw new IllegalArgumentException("Communication mode is not supported: " + method);
                }
            });
        }

        private Function<Flux<ServiceMessage>, Flux<Object>> asFlux(boolean isReturnTypeServiceMessage) {
            return flux -> isReturnTypeServiceMessage ? flux.cast(Object.class) : flux.map(ServiceMessage::data);
        }

        private Function<Mono<ServiceMessage>, Mono<Object>> asMono(boolean isReturnTypeServiceMessage) {
            return mono -> isReturnTypeServiceMessage ? mono.cast(Object.class) : mono.map(ServiceMessage::data);
        }

        private Optional<Object> toStringOrEqualsOrHashCode(String method, Class<?> serviceInterface, Object ... args) {
            switch (method) {
                case "toString": {
                    return Optional.of(serviceInterface.toString());
                }
                case "equals": {
                    return Optional.of(serviceInterface.equals(args[0]));
                }
                case "hashCode": {
                    return Optional.of(serviceInterface.hashCode());
                }
            }
            return Optional.empty();
        }
    }
}

