/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.amqp;

import io.reactivex.Flowable;
import io.reactivex.processors.MulticastProcessor;
import io.smallrye.reactive.messaging.amqp.AmqpMessage;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.axle.amqp.AmqpClient;
import io.vertx.axle.amqp.AmqpMessageBuilder;
import io.vertx.axle.amqp.AmqpReceiver;
import io.vertx.axle.amqp.AmqpSender;
import io.vertx.axle.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Connector(value="smallrye-amqp")
public class AmqpConnector
implements IncomingConnectorFactory,
OutgoingConnectorFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConnector.class);
    static final String CONNECTOR_NAME = "smallrye-amqp";
    private AmqpClient client;
    @Inject
    private Instance<Vertx> instanceOfVertx;
    @Inject
    @ConfigProperty(name="amqp-port", defaultValue="5672")
    private Integer configuredPort;
    @Inject
    @ConfigProperty(name="amqp-server", defaultValue="localhost")
    private String configuredHost;
    @Inject
    @ConfigProperty(name="amqp-username")
    private Optional<String> configuredUsername;
    @Inject
    @ConfigProperty(name="amqp-password")
    private Optional<String> configuredPassword;
    private boolean internalVertxInstance = false;
    private Vertx vertx = null;

    public void terminate(@Observes @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        if (this.internalVertxInstance) {
            this.vertx.close();
        }
    }

    @PostConstruct
    void init() {
        if (this.instanceOfVertx == null || this.instanceOfVertx.isUnsatisfied()) {
            this.internalVertxInstance = true;
            this.vertx = Vertx.vertx();
        } else {
            this.vertx = (Vertx)this.instanceOfVertx.get();
        }
    }

    AmqpConnector() {
    }

    private synchronized AmqpClient getClient(Config config) {
        if (this.client != null) {
            return this.client;
        }
        try {
            String username = config.getOptionalValue("username", String.class).orElseGet(() -> {
                if (this.configuredUsername != null) {
                    return this.configuredUsername.orElse(null);
                }
                return null;
            });
            String password = config.getOptionalValue("password", String.class).orElseGet(() -> {
                if (this.configuredPassword != null) {
                    return this.configuredPassword.orElse(null);
                }
                return null;
            });
            String host = config.getOptionalValue("host", String.class).orElseGet(() -> {
                if (this.configuredHost == null) {
                    LOGGER.info("No AMQP host configured, using localhost");
                    return "localhost";
                }
                return this.configuredHost;
            });
            int port = config.getOptionalValue("port", Integer.class).orElseGet(() -> {
                if (this.configuredPort == null) {
                    return 5672;
                }
                return this.configuredPort;
            });
            String containerId = config.getOptionalValue("containerId", String.class).orElse(null);
            AmqpClientOptions options = new AmqpClientOptions().setUsername(username).setPassword(password).setHost(host).setPort(port).setContainerId(containerId).setReconnectAttempts(100).setReconnectInterval(10L).setConnectTimeout(1000);
            this.client = AmqpClient.create((io.vertx.axle.core.Vertx)new io.vertx.axle.core.Vertx(this.vertx.getDelegate()), (AmqpClientOptions)options);
            return this.client;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Flowable<? extends Message> getStreamOfMessages(AmqpReceiver receiver) {
        return Flowable.defer(() -> Flowable.fromPublisher((Publisher)receiver.toPublisher())).map(AmqpMessage::new);
    }

    private String getAddressOrFail(Config config) {
        return config.getOptionalValue("address", String.class).orElseGet(() -> (String)config.getOptionalValue("channel-name", String.class).orElseThrow(() -> new IllegalArgumentException("Address must be set")));
    }

    public PublisherBuilder<? extends Message> getPublisherBuilder(Config config) {
        String address = this.getAddressOrFail(config);
        boolean broadcast = config.getOptionalValue("broadcast", Boolean.class).orElse(false);
        boolean durable = config.getOptionalValue("durable", Boolean.class).orElse(true);
        boolean autoAck = config.getOptionalValue("auto-acknowledgement", Boolean.class).orElse(false);
        CompletionStage future = this.getClient(config).connect().thenCompose(connection -> connection.createReceiver(address, new AmqpReceiverOptions().setAutoAcknowledgement(autoAck).setDurable(durable)));
        PublisherBuilder builder = ReactiveStreams.fromCompletionStage(future).flatMapRsPublisher(this::getStreamOfMessages);
        if (broadcast) {
            return builder.via((Processor)MulticastProcessor.create());
        }
        return builder;
    }

    public SubscriberBuilder<? extends Message, Void> getSubscriberBuilder(Config config) {
        String address = this.getAddressOrFail(config);
        boolean durable = config.getOptionalValue("durable", Boolean.class).orElse(true);
        long ttl = config.getOptionalValue("ttl", Long.class).orElse(0L);
        AtomicReference sender = new AtomicReference();
        return ReactiveStreams.builder().flatMapCompletionStage(message -> {
            AmqpSender as = (AmqpSender)sender.get();
            if (as == null) {
                try {
                    this.client = this.getClient(config);
                }
                catch (Exception e2) {
                    LOGGER.error("Unable to create client", (Throwable)e2);
                    throw new IllegalStateException("Unable to create a client, probably a config error", e2);
                }
                return this.client.createSender(address).thenApply(s -> {
                    sender.set(s);
                    return s;
                }).thenCompose(s -> {
                    try {
                        return this.send((AmqpSender)s, (Message)message, durable, ttl);
                    }
                    catch (Exception e) {
                        LOGGER.error("Unable to send the message", (Throwable)e);
                        CompletableFuture future = new CompletableFuture();
                        future.completeExceptionally(e);
                        return future;
                    }
                }).whenComplete((m, e) -> {
                    if (e != null) {
                        LOGGER.error("Unable to send the AMQP message", e);
                    }
                });
            }
            return this.send(as, (Message)message, durable, ttl);
        }).ignore();
    }

    private CompletionStage<Message> send(AmqpSender sender, Message msg, boolean durable, long ttl) {
        io.vertx.axle.amqp.AmqpMessage amqp = msg instanceof AmqpMessage ? ((AmqpMessage)msg).getAmqpMessage() : (msg.getPayload() instanceof io.vertx.axle.amqp.AmqpMessage ? (io.vertx.axle.amqp.AmqpMessage)msg.getPayload() : (msg.getPayload() instanceof io.vertx.amqp.AmqpMessage ? new io.vertx.axle.amqp.AmqpMessage((io.vertx.amqp.AmqpMessage)msg.getPayload()) : this.convertToAmqpMessage(msg.getPayload(), durable, ttl)));
        LOGGER.debug("Sending AMQP message to address `{}` ", (Object)(amqp.address() == null ? sender.address() : amqp.address()));
        return sender.sendWithAck(amqp).thenCompose(x -> msg.ack()).thenApply(x -> msg);
    }

    private io.vertx.axle.amqp.AmqpMessage convertToAmqpMessage(Object payload, boolean durable, long ttl) {
        AmqpMessageBuilder builder = io.vertx.axle.amqp.AmqpMessage.create();
        if (durable) {
            builder.durable(true);
        }
        if (ttl > 0L) {
            builder.ttl(ttl);
        }
        if (payload instanceof String) {
            builder.withBody((String)payload);
        } else if (payload instanceof Boolean) {
            builder.withBooleanAsBody(((Boolean)payload).booleanValue());
        } else if (payload instanceof Buffer) {
            builder.withBufferAsBody((Buffer)payload);
        } else if (payload instanceof Byte) {
            builder.withByteAsBody(((Byte)payload).byteValue());
        } else if (payload instanceof Character) {
            builder.withCharAsBody(((Character)payload).charValue());
        } else if (payload instanceof Double) {
            builder.withDoubleAsBody(((Double)payload).doubleValue());
        } else if (payload instanceof Float) {
            builder.withFloatAsBody(((Float)payload).floatValue());
        } else if (payload instanceof Instant) {
            builder.withInstantAsBody((Instant)payload);
        } else if (payload instanceof Integer) {
            builder.withIntegerAsBody(((Integer)payload).intValue());
        } else if (payload instanceof JsonArray) {
            builder.withJsonArrayAsBody((JsonArray)payload);
        } else if (payload instanceof JsonObject) {
            builder.withJsonObjectAsBody((JsonObject)payload);
        } else if (payload instanceof Long) {
            builder.withLongAsBody(((Long)payload).longValue());
        } else if (payload instanceof Short) {
            builder.withShortAsBody(((Short)payload).shortValue());
        } else if (payload instanceof UUID) {
            builder.withUuidAsBody((UUID)payload);
        } else {
            builder.withBody(payload.toString());
        }
        return builder.build();
    }

    @PreDestroy
    public synchronized void close() {
        if (this.client != null) {
            this.client.close();
        }
    }
}

