/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.rpc;

import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jetlinks.core.NativePayload;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.Codec;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.rpc.Invoker;
import org.jetlinks.core.rpc.RpcDefinition;
import org.jetlinks.core.rpc.RpcService;
import org.jetlinks.supports.rpc.RpcRequest;
import org.jetlinks.supports.rpc.RpcResult;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class EventBusRpcService
implements RpcService {
    private static final Logger log = LoggerFactory.getLogger(EventBusRpcService.class);
    private final EventBus eventBus;

    public <REQ, RES> Disposable listen(RpcDefinition<REQ, RES> definition, BiFunction<String, REQ, Publisher<RES>> call) {
        return this.doListen(definition, (s, reqPublisher) -> Flux.from((Publisher)reqPublisher).flatMap(req -> (Publisher)call.apply((String)s, (Object)req)));
    }

    public <RES> Disposable listen(RpcDefinition<Void, RES> definition, Function<String, Publisher<RES>> call) {
        return this.doListen(definition, (topic, request) -> Flux.from((Publisher)request).thenMany((Publisher)call.apply((String)topic)));
    }

    public <REQ, RES> Invoker<REQ, RES> createInvoker(final RpcDefinition<REQ, RES> definition) {
        final String reqTopic = definition.getAddress();
        String reqTopicRes = definition.getAddress() + "/_reply";
        final AtomicLong idInc = new AtomicLong();
        final ConcurrentHashMap request = new ConcurrentHashMap();
        final Disposable disposable = this.eventBus.subscribe(Subscription.of((String)definition.getId(), (String)reqTopicRes, (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker})).doOnNext(payload -> {
            RpcResult result = RpcResult.parse((Payload)payload);
            FluxSink sink = (FluxSink)request.get(result.getRequestId());
            if (null != sink) {
                sink.next((Object)result);
            }
        }).onErrorContinue((err, obj) -> log.error(err.getMessage(), err)).subscribe();
        return new Invoker<REQ, RES>(){

            public Flux<RES> invoke() {
                return this.invoke(null);
            }

            private Mono<Long> doSend(long id, Publisher<? extends REQ> payload) {
                if (payload instanceof Mono) {
                    return Mono.from(payload).flatMap(req -> EventBusRpcService.this.eventBus.publish(reqTopic, (Payload)RpcRequest.nextAndComplete(id, definition.requestCodec().encode(req))));
                }
                if (payload instanceof Flux) {
                    return ((Mono)Flux.from(payload).map(req -> RpcRequest.next(id, definition.requestCodec().encode(req))).as(req -> EventBusRpcService.this.eventBus.publish(reqTopic, (Publisher)req))).doOnSuccess(v -> EventBusRpcService.this.eventBus.publish(reqTopic, (Payload)RpcRequest.complete(id)).subscribe());
                }
                return EventBusRpcService.this.eventBus.publish(reqTopic, (Payload)RpcRequest.nextAndComplete(id, Payload.voidPayload));
            }

            public Flux<RES> invoke(Publisher<? extends REQ> payload) {
                return Flux.create(sink -> {
                    long id = idInc.incrementAndGet();
                    request.put(id, sink);
                    sink.onDispose(() -> {
                        FluxSink cfr_ignored_0 = (FluxSink)request.remove(id);
                    });
                    log.trace("do invoke rpc:{}", (Object)definition.getAddress());
                    this.doSend(id, payload).doOnNext(l -> {
                        if (l == 0L) {
                            sink.error((Throwable)new UnsupportedOperationException("no rpc service for:" + definition.getAddress()));
                        }
                    }).doOnError(arg_0 -> ((FluxSink)sink).error(arg_0)).subscribe();
                }).handle((res, sink) -> {
                    if (res.getType() == RpcResult.Type.RESULT_AND_COMPLETE) {
                        Object r = definition.responseCodec().decode((Payload)res);
                        if (r != null) {
                            sink.next(r);
                        }
                        sink.complete();
                    } else if (res.getType() == RpcResult.Type.RESULT) {
                        Object r = definition.responseCodec().decode((Payload)res);
                        if (r != null) {
                            sink.next(r);
                        }
                    } else if (res.getType() == RpcResult.Type.COMPLETE) {
                        sink.complete();
                    } else if (res.getType() == RpcResult.Type.ERROR) {
                        Throwable e = (Throwable)definition.errorCodec().decode((Payload)res);
                        if (e != null) {
                            sink.error(e);
                        } else {
                            sink.complete();
                        }
                    }
                }).timeout(Duration.ofSeconds(10L));
            }

            public void dispose() {
                disposable.dispose();
            }
        };
    }

    protected Mono<Void> reply(String topic, RpcResult result) {
        return this.eventBus.publish(topic, (Payload)result).then();
    }

    private <REQ, RES> Disposable doListen(RpcDefinition<REQ, RES> definition, BiFunction<String, Publisher<REQ>, Publisher<RES>> invokeResult) {
        ConcurrentHashMap request = new ConcurrentHashMap();
        return this.eventBus.subscribe(Subscription.of((String)definition.getId(), (String)definition.getAddress(), (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker})).map(RpcRequest::parse).doOnCancel(request::clear).subscribe(_req -> request.computeIfAbsent(_req.getRequestId(), id -> new PendingRequest((long)id, definition, invokeResult, () -> {
            PendingRequest cfr_ignored_0 = (PendingRequest)request.remove(id);
        })).next((RpcRequest)_req));
    }

    @ConstructorProperties(value={"eventBus"})
    public EventBusRpcService(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    private class PendingRequest<REQ, RES> {
        long requestId;
        String reqTopicRes;
        String reqTopic;
        RpcDefinition<REQ, RES> definition;
        BiFunction<String, Publisher<REQ>, Publisher<RES>> invoker;
        Disposable disposable;
        EmitterProcessor<REQ> processor = EmitterProcessor.create();
        FluxSink<REQ> sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);

        public PendingRequest(long requestId, RpcDefinition<REQ, RES> definition, BiFunction<String, Publisher<REQ>, Publisher<RES>> invoker, Disposable disposable) {
            this.requestId = requestId;
            this.reqTopic = definition.getAddress();
            this.reqTopicRes = definition.getAddress() + "/_reply";
            this.definition = definition;
            this.invoker = invoker;
            this.disposable = disposable;
            Flux.from(invoker.apply(this.reqTopic, (Publisher<REQ>)this.processor)).flatMap(res -> EventBusRpcService.this.reply(this.reqTopicRes, RpcResult.result(requestId, (Payload)NativePayload.of((Object)res, arg_0 -> ((Codec)definition.responseCodec()).encode(arg_0))))).doOnComplete(() -> EventBusRpcService.this.reply(this.reqTopicRes, RpcResult.complete(requestId)).subscribe()).doOnError(e -> {
                log.error(e.getMessage(), e);
                EventBusRpcService.this.reply(this.reqTopicRes, RpcResult.error(requestId, (Payload)NativePayload.of((Object)e, arg_0 -> ((Codec)definition.errorCodec()).encode(arg_0)))).subscribe();
            }).subscribe();
            this.sink.onDispose(disposable);
        }

        void next(RpcRequest req) {
            try {
                if (req.getType() == RpcRequest.Type.COMPLETE) {
                    this.sink.complete();
                    return;
                }
                Object v = this.definition.requestCodec().decode((Payload)req);
                if (v != null) {
                    this.sink.next(v);
                }
                if (req.getType() == RpcRequest.Type.NEXT_AND_END) {
                    this.sink.complete();
                }
            }
            catch (Throwable e) {
                log.error(e.getMessage(), e);
                this.sink.error(e);
            }
        }
    }
}

