/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.jms;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.JmsConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.jms.JmsProperties;
import io.smallrye.reactive.messaging.jms.JmsPropertiesBuilder;
import io.smallrye.reactive.messaging.jms.JmsResourceHolder;
import io.smallrye.reactive.messaging.jms.OutgoingJmsMessageMetadata;
import io.smallrye.reactive.messaging.jms.i18n.JmsExceptions;
import io.smallrye.reactive.messaging.jms.i18n.JmsLogging;
import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.jms.tracing.JmsTrace;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import jakarta.enterprise.inject.Instance;
import jakarta.jms.BytesMessage;
import jakarta.jms.Destination;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.JMSProducer;
import jakarta.jms.Message;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.time.Duration;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;

class JmsSink {
    private final Flow.Subscriber<org.eclipse.microprofile.reactive.messaging.Message<?>> sink;
    private final JsonMapping jsonMapping;
    private final Executor executor;
    private final JmsOpenTelemetryInstrumenter jmsInstrumenter;
    private final boolean isTracingEnabled;

    JmsSink(JmsResourceHolder<JMSProducer> resourceHolder, JmsConnectorOutgoingConfiguration config, Instance<OpenTelemetry> openTelemetryInstance, JsonMapping jsonMapping, Executor executor) {
        this.isTracingEnabled = config.getTracingEnabled();
        String name = config.getDestination().orElseGet(config::getChannel);
        String type = config.getDestinationType();
        boolean retry = config.getRetry();
        int retryMaxRetries = config.getRetryMaxRetries();
        Duration retryInitialDelay = Duration.parse(config.getRetryInitialDelay());
        Duration retryMaxDelay = Duration.parse(config.getRetryMaxDelay());
        double retryJitter = config.getRetryJitter();
        resourceHolder.configure(r -> this.getDestination(r.getContext(), name, type), r -> {
            JMSContext context = r.getContext();
            JMSProducer producer = context.createProducer();
            config.getDeliveryDelay().ifPresent(arg_0 -> ((JMSProducer)producer).setDeliveryDelay(arg_0));
            config.getDeliveryMode().ifPresent(v -> {
                if (v.equalsIgnoreCase("persistent")) {
                    producer.setDeliveryMode(2);
                } else if (v.equalsIgnoreCase("non_persistent")) {
                    producer.setDeliveryMode(1);
                } else {
                    throw JmsExceptions.ex.illegalArgumentInvalidDeliveryMode((String)v);
                }
            });
            config.getDisableMessageId().ifPresent(arg_0 -> ((JMSProducer)producer).setDisableMessageID(arg_0));
            config.getDisableMessageTimestamp().ifPresent(arg_0 -> ((JMSProducer)producer).setDisableMessageTimestamp(arg_0));
            config.getCorrelationId().ifPresent(arg_0 -> ((JMSProducer)producer).setJMSCorrelationID(arg_0));
            config.getTtl().ifPresent(arg_0 -> ((JMSProducer)producer).setTimeToLive(arg_0));
            config.getPriority().ifPresent(arg_0 -> ((JMSProducer)producer).setPriority(arg_0));
            config.getReplyTo().ifPresent(rt -> {
                Topic replyToDestination;
                String replyToDestinationType = config.getReplyToDestinationType();
                if (replyToDestinationType.equalsIgnoreCase("topic")) {
                    replyToDestination = context.createTopic(rt);
                } else if (replyToDestinationType.equalsIgnoreCase("queue")) {
                    replyToDestination = context.createQueue(rt);
                } else {
                    throw JmsExceptions.ex.illegalArgumentInvalidDestinationType(replyToDestinationType);
                }
                producer.setJMSReplyTo((Destination)replyToDestination);
            });
            return producer;
        });
        resourceHolder.getDestination();
        resourceHolder.getClient();
        this.jsonMapping = jsonMapping;
        this.executor = executor;
        this.jmsInstrumenter = this.isTracingEnabled ? JmsOpenTelemetryInstrumenter.createForSink(openTelemetryInstance) : null;
        this.sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> this.send(resourceHolder, (org.eclipse.microprofile.reactive.messaging.Message<?>)message).onFailure(t -> retry).retry().withJitter(retryJitter).withBackOff(retryInitialDelay, retryMaxDelay).atMost((long)retryMaxRetries)).onFailure().invoke(JmsLogging.log::unableToSend));
    }

    private Uni<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> send(JmsResourceHolder<JMSProducer> resourceHolder, org.eclipse.microprofile.reactive.messaging.Message<?> message) {
        Object payload = message.getPayload();
        Destination destination = resourceHolder.getDestination();
        JMSContext context = resourceHolder.getContext();
        if (payload instanceof Message) {
            this.outgoingTrace(destination, message, (Message)payload);
            return this.dispatch(message, () -> ((JMSProducer)resourceHolder.getClient()).send(destination, (Message)payload));
        }
        try {
            Destination actualDestination;
            TextMessage outgoing;
            if (payload instanceof String || payload.getClass().isPrimitive() || this.isPrimitiveBoxed(payload.getClass())) {
                outgoing = context.createTextMessage(payload.toString());
                outgoing.setStringProperty("_classname", payload.getClass().getName());
                outgoing.setJMSType(payload.getClass().getName());
            } else if (payload.getClass().isArray() && payload.getClass().getComponentType().equals(Byte.TYPE)) {
                BytesMessage o = context.createBytesMessage();
                o.writeBytes((byte[])payload);
                outgoing = o;
            } else {
                outgoing = context.createTextMessage(this.jsonMapping.toJson(payload));
                outgoing.setJMSType(payload.getClass().getName());
                outgoing.setStringProperty("_classname", payload.getClass().getName());
            }
            OutgoingJmsMessageMetadata metadata = message.getMetadata(OutgoingJmsMessageMetadata.class).orElse(null);
            if (metadata != null) {
                String correlationId = metadata.getCorrelationId();
                Destination replyTo = metadata.getReplyTo();
                Destination dest = metadata.getDestination();
                int deliveryMode = metadata.getDeliveryMode();
                String type = metadata.getType();
                JmsProperties properties = metadata.getProperties();
                if (correlationId != null) {
                    outgoing.setJMSCorrelationID(correlationId);
                }
                if (replyTo != null) {
                    outgoing.setJMSReplyTo(replyTo);
                }
                if (dest != null) {
                    outgoing.setJMSDestination(dest);
                }
                if (deliveryMode != -1) {
                    outgoing.setJMSDeliveryMode(deliveryMode);
                }
                if (type != null) {
                    outgoing.setJMSType(type);
                }
                if (type != null) {
                    outgoing.setJMSType(type);
                }
                if (properties != null) {
                    if (!(properties instanceof JmsPropertiesBuilder.OutgoingJmsProperties)) {
                        throw JmsExceptions.ex.illegalStateUnableToMapProperties(properties.getClass().getName());
                    }
                    JmsPropertiesBuilder.OutgoingJmsProperties op = (JmsPropertiesBuilder.OutgoingJmsProperties)properties;
                    op.getProperties().forEach(arg_0 -> JmsSink.lambda$send$8((Message)outgoing, arg_0));
                }
                actualDestination = dest != null ? dest : destination;
            } else {
                actualDestination = destination;
            }
            this.outgoingTrace(actualDestination, message, (Message)outgoing);
            return this.dispatch(message, () -> JmsSink.lambda$send$9(resourceHolder, actualDestination, (Message)outgoing));
        }
        catch (JMSException e) {
            return Uni.createFrom().failure((Throwable)new IllegalStateException(e));
        }
    }

    private void outgoingTrace(Destination actualDestination, org.eclipse.microprofile.reactive.messaging.Message<?> message, Message payload) {
        if (this.isTracingEnabled) {
            Message jmsPayload = payload;
            HashMap<String, Object> messageProperties = new HashMap<String, Object>();
            try {
                Enumeration propertyNames = jmsPayload.getPropertyNames();
                while (propertyNames.hasMoreElements()) {
                    String propertyName = (String)propertyNames.nextElement();
                    messageProperties.put(propertyName, jmsPayload.getObjectProperty(propertyName));
                }
            }
            catch (JMSException e) {
                throw new RuntimeException(e);
            }
            JmsTrace jmsTrace = new JmsTrace.Builder().withQueue(actualDestination.toString()).withMessage(jmsPayload).build();
            this.jmsInstrumenter.traceOutgoing(message, jmsTrace);
        }
    }

    private boolean isPrimitiveBoxed(Class<?> c) {
        return c.equals(Boolean.class) || c.equals(Integer.class) || c.equals(Byte.class) || c.equals(Double.class) || c.equals(Float.class) || c.equals(Short.class) || c.equals(Character.class) || c.equals(Long.class);
    }

    private Uni<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> dispatch(org.eclipse.microprofile.reactive.messaging.Message<?> incoming, Runnable action) {
        return Uni.createFrom().item(incoming).invoke(action).call(message -> Uni.createFrom().completionStage(() -> ((org.eclipse.microprofile.reactive.messaging.Message)incoming).ack())).runSubscriptionOn(this.executor);
    }

    private Destination getDestination(JMSContext context, String name, String type) {
        switch (type.toLowerCase()) {
            case "queue": {
                return context.createQueue(name);
            }
            case "topic": {
                return context.createTopic(name);
            }
        }
        throw JmsExceptions.ex.illegalStateUnknownDestinationType(type);
    }

    Flow.Subscriber<org.eclipse.microprofile.reactive.messaging.Message<?>> getSink() {
        return this.sink;
    }

    private static /* synthetic */ void lambda$send$9(JmsResourceHolder resourceHolder, Destination actualDestination, Message outgoing) {
        ((JMSProducer)resourceHolder.getClient()).send(actualDestination, outgoing);
    }

    private static /* synthetic */ void lambda$send$8(Message outgoing, JmsPropertiesBuilder.Property p) {
        p.apply(outgoing);
    }
}

