/*
 * Decompiled with CFR 0.152.
 */
package io.nats.bridge.support;

import io.nats.bridge.MessageBus;
import io.nats.bridge.messages.Message;
import io.nats.bridge.messages.transform.TransformMessage;
import io.nats.bridge.support.MessageBridgeBase;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedTransferQueue;

public class MessageBridgeRequestReply
extends MessageBridgeBase {
    protected final Queue<MessageBridgeBase.MessageBridgeRequestReply> replyMessageQueue;

    public MessageBridgeRequestReply(String name, MessageBus sourceBus, MessageBus destinationBus, Queue<MessageBridgeBase.MessageBridgeRequestReply> replyMessageQueue, List<String> inputTransforms, List<String> outputTransforms, Map<String, TransformMessage> transformers) {
        super(name, sourceBus, destinationBus, inputTransforms, outputTransforms, transformers);
        this.replyMessageQueue = replyMessageQueue != null ? replyMessageQueue : new LinkedTransferQueue();
    }

    @Override
    protected void processMessage(Message receiveMessageFromSource) {
        Message currentMessageFinal = this.transformMessageIfNeeded(receiveMessageFromSource, this.transforms);
        if (currentMessageFinal == null) {
            return;
        }
        try {
            this.destinationBus.request(currentMessageFinal, replyMessage -> {
                Message replyMessageFinal;
                if (this.runtimeLogger.isTraceEnabled()) {
                    this.runtimeLogger.info("The bridge {} got reply message {} \n for request message {} ", new Object[]{this.name, replyMessage.bodyAsString(), currentMessageFinal});
                }
                this.replyMessageQueue.add(new MessageBridgeBase.MessageBridgeRequestReply(currentMessageFinal, (replyMessageFinal = this.transformMessageIfNeeded((Message)replyMessage, this.outputTransforms)) != null ? replyMessageFinal : replyMessage));
            });
        }
        catch (Exception ex) {
            this.restartMessageBus(ex, this.destinationBus);
            this.destinationBus.request(currentMessageFinal, replyMessage -> {
                Message replyMessageFinal;
                if (this.runtimeLogger.isTraceEnabled()) {
                    this.runtimeLogger.info("The bridge {} got reply message {} \n for request message {} ", new Object[]{this.name, replyMessage.bodyAsString(), currentMessageFinal});
                }
                this.replyMessageQueue.add(new MessageBridgeBase.MessageBridgeRequestReply(currentMessageFinal, (replyMessageFinal = this.transformMessageIfNeeded((Message)replyMessage, this.outputTransforms)) != null ? replyMessageFinal : replyMessage));
            });
        }
    }

    private int processReplies() {
        int i = 0;
        MessageBridgeBase.MessageBridgeRequestReply requestReply = this.replyMessageQueue.poll();
        while (requestReply != null) {
            ++i;
            requestReply.respond();
            requestReply = this.replyMessageQueue.poll();
        }
        return i;
    }

    @Override
    protected int otherProcess() {
        return this.processReplies();
    }
}

