/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.amqpbridge.impl;

import io.vertx.amqpbridge.AmqpBridge;
import io.vertx.amqpbridge.AmqpBridgeOptions;
import io.vertx.amqpbridge.impl.AmqpConsumerImpl;
import io.vertx.amqpbridge.impl.AmqpMessageImpl;
import io.vertx.amqpbridge.impl.AmqpProducerImpl;
import io.vertx.amqpbridge.impl.BridgeMetaDataSupportImpl;
import io.vertx.amqpbridge.impl.MessageTranslatorImpl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonReceiver;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;

public class AmqpBridgeImpl
implements AmqpBridge {
    private final Vertx vertx;
    private final Context bridgeContext;
    private final AmqpBridgeOptions options;
    private ProtonClient client;
    private ProtonConnection connection;
    private ProtonReceiver replyToConsumer;
    private String replyToConsumerAddress;
    private AmqpProducerImpl replySender;
    private Map<String, Handler<?>> replyToMapping = new ConcurrentHashMap();
    private MessageTranslatorImpl translator = new MessageTranslatorImpl();
    private AtomicBoolean started = new AtomicBoolean();
    private static final Logger LOG = LoggerFactory.getLogger(AmqpBridgeImpl.class);

    public AmqpBridgeImpl(Vertx vertx, AmqpBridgeOptions options) {
        this.vertx = vertx;
        this.options = options;
        this.bridgeContext = vertx.getOrCreateContext();
    }

    @Override
    public void start(String hostname, int port, Handler<AsyncResult<AmqpBridge>> resultHandler) {
        this.start(hostname, port, null, null, resultHandler);
    }

    @Override
    public void start(String hostname, int port, String username, String password, Handler<AsyncResult<AmqpBridge>> resultHandler) {
        this.runOnContext(true, (Handler<Void>)((Handler)v -> this.startImpl(hostname, port, username, password, resultHandler)));
    }

    private void startImpl(String hostname, int port, String username, String password, Handler<AsyncResult<AmqpBridge>> resultHandler) {
        this.client = ProtonClient.create((Vertx)this.vertx);
        this.client.connect((ProtonClientOptions)this.options, hostname, port, username, password, connectResult -> {
            if (connectResult.succeeded()) {
                this.connection = (ProtonConnection)connectResult.result();
                LinkedHashMap<Symbol, String> props = new LinkedHashMap<Symbol, String>();
                props.put(BridgeMetaDataSupportImpl.PRODUCT_KEY, "vertx-amqp-bridge");
                props.put(BridgeMetaDataSupportImpl.VERSION_KEY, BridgeMetaDataSupportImpl.VERSION);
                this.connection.setProperties(props);
                if (this.options.getVhost() != null) {
                    this.connection.setHostname(this.options.getVhost());
                }
                if (this.options.getContainerId() != null) {
                    this.connection.setContainer(this.options.getContainerId());
                }
                this.connection.openHandler(openResult -> {
                    LOG.trace((Object)"Bridge connection open complete");
                    if (openResult.succeeded()) {
                        if (!this.options.isReplyHandlingSupport() || !this.connection.isAnonymousRelaySupported()) {
                            this.started.set(true);
                            resultHandler.handle((Object)Future.succeededFuture((Object)this));
                            return;
                        }
                        this.replySender = new AmqpProducerImpl(this, this.connection, null);
                        this.replyToConsumer = this.connection.createReceiver(null);
                        Source source = (Source)this.replyToConsumer.getSource();
                        source.setDynamic(true);
                        this.replyToConsumer.handler(this::handleIncomingMessageReply);
                        ((ProtonReceiver)this.replyToConsumer.openHandler(replyToConsumerResult -> {
                            if (replyToConsumerResult.succeeded()) {
                                Source remoteSource = (Source)this.replyToConsumer.getRemoteSource();
                                if (remoteSource != null) {
                                    this.replyToConsumerAddress = remoteSource.getAddress();
                                }
                                this.started.set(true);
                                resultHandler.handle((Object)Future.succeededFuture((Object)this));
                            } else {
                                resultHandler.handle((Object)Future.failedFuture((Throwable)replyToConsumerResult.cause()));
                            }
                        })).open();
                    } else {
                        resultHandler.handle((Object)Future.failedFuture((Throwable)openResult.cause()));
                    }
                }).open();
                this.connection.open();
            } else {
                resultHandler.handle((Object)Future.failedFuture((Throwable)connectResult.cause()));
            }
        });
    }

    public MessageConsumer<JsonObject> createConsumer(String amqpAddress) {
        if (!this.started.get()) {
            throw new IllegalStateException("Bridge was not successfully started");
        }
        return new AmqpConsumerImpl(this, this.connection, amqpAddress);
    }

    public MessageProducer<JsonObject> createProducer(String amqpAddress) {
        if (!this.started.get()) {
            throw new IllegalStateException("Bridge was not successfully started");
        }
        return new AmqpProducerImpl(this, this.connection, amqpAddress);
    }

    @Override
    public void close(Handler<AsyncResult<Void>> resultHandler) {
        this.runOnContext(true, (Handler<Void>)((Handler)v -> this.shutdownImpl(resultHandler)));
    }

    private void shutdownImpl(Handler<AsyncResult<Void>> resultHandler) {
        if (this.connection != null) {
            this.connection.closeHandler(res -> {
                try {
                    if (res.succeeded()) {
                        resultHandler.handle((Object)Future.succeededFuture());
                    } else {
                        resultHandler.handle((Object)Future.failedFuture((Throwable)res.cause()));
                    }
                }
                finally {
                    this.connection.disconnect();
                    this.connection = null;
                }
            }).close();
        }
    }

    <R> void registerReplyToHandler(org.apache.qpid.proton.message.Message msg, Handler<AsyncResult<Message<R>>> replyHandler) {
        this.verifyReplyToAddressAvailable();
        msg.setReplyTo(this.replyToConsumerAddress);
        String generatedMessageId = UUID.randomUUID().toString();
        msg.setMessageId((Object)generatedMessageId);
        this.replyToMapping.put(generatedMessageId, replyHandler);
    }

    void verifyReplyToAddressAvailable() throws IllegalStateException {
        if (this.replyToConsumerAddress == null) {
            throw new IllegalStateException("No reply-to address available, unable to send with a reply handler. Try an explicit consumer for replies.");
        }
    }

    private void handleIncomingMessageReply(ProtonDelivery delivery, org.apache.qpid.proton.message.Message protonMessage) {
        Handler<?> handler;
        Object correlationId = protonMessage.getCorrelationId();
        if (correlationId != null && (handler = this.replyToMapping.remove(correlationId)) != null) {
            Handler<?> h = handler;
            JsonObject body = this.translator.convertToJsonObject(protonMessage);
            AmqpMessageImpl msg = new AmqpMessageImpl(body, this, protonMessage, delivery, this.replyToConsumerAddress, protonMessage.getReplyTo());
            Future result = Future.succeededFuture((Object)msg);
            h.handle((Object)result);
            return;
        }
        LOG.error((Object)("Received message on replyTo consumer, could not match to a replyHandler: " + protonMessage));
    }

    <R> void sendReply(org.apache.qpid.proton.message.Message origIncomingMessage, JsonObject replyBody, Handler<AsyncResult<Message<R>>> replyHandler) {
        if (this.replySender == null) {
            throw new IllegalStateException("No reply sender available, unable to send implicit replies. Try an explicit producer for replies.");
        }
        String replyAddress = origIncomingMessage.getReplyTo();
        if (replyAddress == null) {
            throw new IllegalStateException("Original message has no reply-to address, unable to send implicit reply");
        }
        Object origMessageId = origIncomingMessage.getMessageId();
        if (origMessageId != null) {
            JsonObject replyBodyProps = replyBody.getJsonObject("properties");
            if (replyBodyProps == null) {
                replyBodyProps = new JsonObject();
                replyBody.put("properties", replyBodyProps);
            }
            replyBodyProps.put("correlation_id", origMessageId);
        }
        this.replySender.doSend(replyBody, replyHandler, replyAddress);
    }

    boolean onContextEventLoop() {
        return ((ContextInternal)this.bridgeContext).nettyEventLoop().inEventLoop();
    }

    void runOnContext(boolean immediateIfOnContext, Handler<Void> action) {
        if (immediateIfOnContext && this.onContextEventLoop()) {
            action.handle(null);
        } else {
            this.bridgeContext.runOnContext(action);
        }
    }
}

