/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.ipc;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.Encoder;
import org.jetlinks.core.codec.defaults.DirectCodec;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.ipc.IpcDefinition;
import org.jetlinks.core.ipc.IpcInvoker;
import org.jetlinks.supports.ipc.IpcRequest;
import org.jetlinks.supports.ipc.IpcResponse;
import org.jetlinks.supports.ipc.ResponseType;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class EventBusIpcResponder<REQ, RES>
implements Disposable {
    private static final Logger log = LoggerFactory.getLogger(EventBusIpcResponder.class);
    private final EventBus eventBus;
    private final IpcDefinition<REQ, RES> definition;
    private final IpcInvoker<REQ, RES> invoker;
    private final Map<Integer, EmitterProcessor<REQ>> pendingChannel = new ConcurrentHashMap<Integer, EmitterProcessor<REQ>>();
    private final String acceptTopic;
    private Disposable disposable;

    EventBusIpcResponder(EventBus eventBus, IpcDefinition<REQ, RES> definition, IpcInvoker<REQ, RES> invoker) {
        this.eventBus = eventBus;
        this.definition = definition;
        this.invoker = invoker;
        this.acceptTopic = "/_ipc/" + definition.getAddress().replace("/", "-") + "/" + invoker.getName();
        this.init();
    }

    void init() {
        this.disposable = this.eventBus.subscribe(Subscription.builder().subscriberId(this.definition.getAddress()).local().broker().shared().topics(new String[]{this.acceptTopic}).build()).flatMap(this::handleRequest).subscribe();
    }

    private Mono<Void> handleRequest(TopicPayload payload) {
        try {
            return this.handleRequest(IpcRequest.decode((Payload)payload, this.definition.requestCodec())).onErrorResume(err -> {
                log.error(err.getMessage(), err);
                return Mono.empty();
            });
        }
        catch (Throwable e) {
            log.error(e.getMessage(), e);
            return Mono.empty();
        }
    }

    private Mono<Void> handleRequest(IpcRequest<REQ> request) {
        int consumerId = request.getConsumerId();
        int messageId = request.getMessageId();
        log.trace("handle ipc request {} {}", (Object)request.getType(), (Object)messageId);
        switch (request.getType()) {
            case fireAndForget: {
                return this.invoker.fireAndForget(request.getRequest());
            }
            case noArgFireAndForget: {
                return this.invoker.fireAndForget();
            }
            case request: {
                return this.handleInvoke(consumerId, messageId, (Publisher<RES>)this.invoker.request(request.getRequest()));
            }
            case noArgRequest: {
                return this.handleInvoke(consumerId, messageId, (Publisher<RES>)this.invoker.request());
            }
            case requestStream: {
                return this.handleInvoke(consumerId, messageId, (Publisher<RES>)this.invoker.requestStream(request.getRequest()));
            }
            case noArgRequestStream: {
                return this.handleInvoke(consumerId, messageId, (Publisher<RES>)this.invoker.requestStream());
            }
            case requestChannel: {
                this.pendingChannel.computeIfAbsent(messageId, ignore -> {
                    EmitterProcessor processor = EmitterProcessor.create((int)Integer.MAX_VALUE);
                    this.handleInvoke(consumerId, messageId, (Publisher<RES>)this.invoker.requestChannel((Publisher)processor)).subscribe();
                    return processor;
                }).onNext(request.getRequest());
                return Mono.empty();
            }
            case cancel: {
                Optional.ofNullable(this.pendingChannel.remove(messageId)).ifPresent(EmitterProcessor::onComplete);
            }
        }
        return Mono.empty();
    }

    private Mono<Void> handleInvoke(int consumerId, int messageId, Publisher<RES> result) {
        if (result instanceof Mono) {
            return Mono.from(result).switchIfEmpty(Mono.defer(() -> this.doReply(consumerId, messageId, -1, ResponseType.complete, null).then(Mono.empty()))).flatMap(res -> this.doReply(consumerId, messageId, -1, ResponseType.complete, res)).onErrorResume(err -> this.doReply(consumerId, messageId, (Throwable)err));
        }
        AtomicReference<Integer> seqRef = new AtomicReference<Integer>(-1);
        return this.eventBus.publish(this.acceptTopic + "/" + consumerId + "/_reply", (Encoder)DirectCodec.instance(), (Publisher)Flux.from(result).index().map(tp2 -> {
            int seq = ((Long)tp2.getT1()).intValue();
            seqRef.set(seq);
            return Payload.of((ByteBuf)IpcResponse.of(ResponseType.next, seq, messageId, tp2.getT2(), null).toByteBuf((Encoder<Object>)this.definition.responseCodec(), (Encoder<Throwable>)this.definition.errorCodec()));
        })).flatMap(i -> this.doReply(consumerId, messageId, (Integer)seqRef.get(), ResponseType.complete, null)).doOnError(err -> {
            log.warn("reply [{}.{}] error", new Object[]{consumerId, messageId, err});
            this.doReply(consumerId, messageId, (Throwable)err).subscribe();
        }).then();
    }

    private Mono<Void> doReply(int consumerId, int messageId, Throwable throwable) {
        return this.doReply(consumerId, IpcResponse.of(ResponseType.error, -1, messageId, null, throwable).toByteBuf((Encoder<Object>)this.definition.responseCodec(), (Encoder<Throwable>)this.definition.errorCodec()));
    }

    private Mono<Void> doReply(int consumerId, int messageId, int seq, ResponseType responseType, RES response) {
        return this.doReply(consumerId, IpcResponse.of(responseType, seq, messageId, response, null).toByteBuf((Encoder<RES>)this.definition.responseCodec(), (Encoder<Throwable>)this.definition.errorCodec()));
    }

    private Mono<Void> doReply(int consumerId, ByteBuf byteBuf) {
        Payload payload = Payload.of((ByteBuf)byteBuf);
        return this.eventBus.publish(this.acceptTopic + "/" + consumerId + "/_reply", payload).doOnNext(i -> {
            if (i == 0L) {
                log.warn("reply ipc failed,no consumer[{}] listener", (Object)consumerId);
                ReferenceCountUtil.safeRelease((Object)payload);
            }
        }).then();
    }

    public boolean isDisposed() {
        return this.disposable.isDisposed();
    }

    public void dispose() {
        this.disposable.dispose();
    }
}

