/*
 * Decompiled with CFR 0.152.
 */
package io.nats.bridge.support;

import io.nats.bridge.MessageBridge;
import io.nats.bridge.MessageBus;
import io.nats.bridge.messages.Message;
import io.nats.bridge.messages.transform.TransformMessage;
import io.nats.bridge.metrics.Metrics;
import io.nats.bridge.support.MessageBridgeUtil;
import io.nats.bridge.support.MessageBusRestarter;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MessageBridgeBase
implements MessageBridge {
    protected final MessageBus sourceBus;
    protected final MessageBus destinationBus;
    protected final String name;
    protected final boolean transformMessage;
    protected final List<String> transforms;
    protected final Map<String, TransformMessage> transformers;
    protected final List<String> outputTransforms;
    protected final Logger logger = LoggerFactory.getLogger(io.nats.bridge.support.MessageBridgeRequestReply.class);
    protected final Logger runtimeLogger = LoggerFactory.getLogger((String)"runtime");
    protected final MessageBusRestarter messageBusRestarter;

    public MessageBridgeBase(String name, MessageBus sourceBus, MessageBus destinationBus, List<String> inputTransforms, List<String> outputTransforms, Map<String, TransformMessage> transformers) {
        this.sourceBus = sourceBus;
        this.destinationBus = destinationBus;
        this.name = "bridge-" + name.toLowerCase().replace(".", "-").replace(" ", "-");
        this.transforms = inputTransforms;
        this.outputTransforms = outputTransforms;
        boolean inputTransformEnabled = inputTransforms != null && inputTransforms.size() > 0;
        boolean outputTransformEnabled = outputTransforms != null && outputTransforms.size() > 0;
        this.transformMessage = inputTransformEnabled || outputTransformEnabled;
        this.transformers = this.transformMessage ? transformers : Collections.emptyMap();
        this.messageBusRestarter = new MessageBusRestarter(name, this.logger);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public int process() {
        Optional<Object> receiveMessageFromSourceOption;
        try {
            receiveMessageFromSourceOption = this.sourceBus.receive();
        }
        catch (Exception ex) {
            receiveMessageFromSourceOption = Optional.empty();
            this.restartMessageBus(ex, this.sourceBus);
        }
        return this.doProcess(receiveMessageFromSourceOption);
    }

    protected Message transformMessageIfNeeded(Message receiveMessageFromSource, List<String> transforms) {
        return MessageBridgeUtil.transformMessageIfNeeded(receiveMessageFromSource, transforms, this.transformMessage, this.transformers, this.logger, this.runtimeLogger);
    }

    private int doProcess(Optional<Message> receiveMessageFromSourceOption) {
        if (receiveMessageFromSourceOption.isPresent() && this.runtimeLogger.isTraceEnabled()) {
            this.runtimeLogger.trace("The {} bridge received message with body {}", (Object)this.name(), (Object)receiveMessageFromSourceOption.get().bodyAsString());
        }
        receiveMessageFromSourceOption.ifPresent(this::processMessage);
        return this.processMessageBusQueues(receiveMessageFromSourceOption);
    }

    protected abstract void processMessage(Message var1);

    protected int processBus(MessageBus bus) {
        try {
            return bus.process();
        }
        catch (Exception ex) {
            this.restartMessageBus(ex, bus);
            return bus.process();
        }
    }

    private int processMessageBusQueues(Optional<Message> receiveMessageFromSourceOption) {
        int count = 0;
        if (receiveMessageFromSourceOption.isPresent()) {
            ++count;
        }
        count += this.processBus(this.sourceBus);
        count += this.processBus(this.destinationBus);
        return count += this.otherProcess();
    }

    protected int otherProcess() {
        return 0;
    }

    protected void restartMessageBus(Exception ex, MessageBus messageBus) {
        this.messageBusRestarter.restartMessageBus(ex, messageBus);
    }

    @Override
    public int process(Duration duration) {
        Optional<Message> receiveMessageFromSourceOption;
        try {
            receiveMessageFromSourceOption = this.sourceBus.receive(duration);
        }
        catch (Exception ex) {
            receiveMessageFromSourceOption = Optional.empty();
            this.restartMessageBus(ex, this.sourceBus);
        }
        return this.doProcess(receiveMessageFromSourceOption);
    }

    @Override
    public void close() {
        this.sourceBus.close();
        this.destinationBus.close();
    }

    @Override
    public Metrics sourceMetrics() {
        return this.sourceBus.metrics();
    }

    @Override
    public Metrics destinationMetrics() {
        return this.destinationBus.metrics();
    }

    public static class MessageBridgeRequestReply {
        final Message request;
        final Message reply;

        public MessageBridgeRequestReply(Message request, Message reply) {
            this.request = request;
            this.reply = reply;
        }

        public void respond() {
            this.request.reply(this.reply);
        }
    }
}

