/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.jms;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.jms.IncomingJmsMessage;
import io.smallrye.reactive.messaging.jms.IncomingJmsMessageMetadata;
import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.jms.JmsResourceHolder;
import io.smallrye.reactive.messaging.jms.i18n.JmsExceptions;
import io.smallrye.reactive.messaging.jms.i18n.JmsLogging;
import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.jms.tracing.JmsTrace;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.vertx.core.Context;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.inject.Instance;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.Message;
import jakarta.jms.Queue;
import jakarta.jms.Topic;
import java.time.Duration;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

class JmsSource {
    private final Multi<IncomingJmsMessage<?>> source;
    private final JmsResourceHolder<JMSConsumer> resourceHolder;
    private final JmsPublisher publisher;
    private final boolean isTracingEnabled;
    private final JmsOpenTelemetryInstrumenter jmsInstrumenter;
    private final io.vertx.mutiny.core.Context context;

    JmsSource(Vertx vertx, JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config, Instance<OpenTelemetry> openTelemetryInstance, JsonMapping jsonMapping, Executor executor) {
        this.isTracingEnabled = config.getTracingEnabled();
        String channel = config.getChannel();
        String destinationName = config.getDestination().orElseGet(config::getChannel);
        String selector = config.getSelector().orElse(null);
        boolean nolocal = config.getNoLocal();
        boolean durable = config.getDurable();
        String type = config.getDestinationType();
        boolean retry = config.getRetry();
        this.resourceHolder = resourceHolder.configure(r -> this.getDestination(r.getContext(), destinationName, type), r -> {
            if (durable) {
                if (!(r.getDestination() instanceof Topic)) {
                    throw JmsExceptions.ex.illegalArgumentInvalidDestination();
                }
                return r.getContext().createDurableConsumer((Topic)r.getDestination(), destinationName, selector, nolocal);
            }
            return r.getContext().createConsumer(r.getDestination(), selector, nolocal);
        });
        resourceHolder.getClient();
        this.jmsInstrumenter = this.isTracingEnabled ? JmsOpenTelemetryInstrumenter.createForSource(openTelemetryInstance) : null;
        this.publisher = new JmsPublisher(resourceHolder);
        this.context = io.vertx.mutiny.core.Context.newInstance((Context)((VertxInternal)vertx.getDelegate()).createEventLoopContext());
        this.source = Multi.createFrom().publisher((Flow.Publisher)this.publisher).emitOn(arg_0 -> ((io.vertx.mutiny.core.Context)this.context).runOnContext(arg_0)).map(m -> new IncomingJmsMessage((Message)m, executor, jsonMapping)).onItem().invoke(this::incomingTrace).onFailure(t -> {
            JmsLogging.log.terminalErrorOnChannel(channel);
            this.resourceHolder.close();
            return retry;
        }).retry().withBackOff(Duration.parse(config.getRetryInitialDelay()), Duration.parse(config.getRetryMaxDelay())).withJitter(config.getRetryJitter().doubleValue()).atMost((long)config.getRetryMaxRetries().intValue()).onFailure().invoke(throwable -> JmsLogging.log.terminalErrorRetriesExhausted(config.getChannel(), (Throwable)throwable)).plug(m -> {
            if (config.getBroadcast().booleanValue()) {
                return m.broadcast().toAllSubscribers();
            }
            return m;
        });
    }

    void close() {
        this.publisher.close();
        this.resourceHolder.close();
    }

    private Destination getDestination(JMSContext context, String name, String type) {
        switch (type.toLowerCase()) {
            case "queue": {
                JmsLogging.log.creatingQueue(name);
                return context.createQueue(name);
            }
            case "topic": {
                JmsLogging.log.creatingTopic(name);
                return context.createTopic(name);
            }
        }
        throw JmsExceptions.ex.illegalArgumentUnknownDestinationType(type);
    }

    Multi<IncomingJmsMessage<?>> getSource() {
        return this.source;
    }

    public void incomingTrace(IncomingJmsMessage<?> jmsMessage) {
        if (this.isTracingEnabled) {
            Optional metadata = jmsMessage.getMetadata(IncomingJmsMessageMetadata.class);
            Optional<String> queueName = metadata.map(a -> {
                Destination destination = a.getDestination();
                if (destination instanceof Queue) {
                    Queue queue = (Queue)destination;
                    try {
                        return queue.getQueueName();
                    }
                    catch (JMSException e) {
                        return null;
                    }
                }
                return null;
            });
            Message unwrapped = jmsMessage.unwrap(Message.class);
            HashMap<String, Object> properties = new HashMap<String, Object>();
            try {
                Enumeration propertyNames = unwrapped.getPropertyNames();
                while (propertyNames.hasMoreElements()) {
                    String name = (String)propertyNames.nextElement();
                    Object value = unwrapped.getObjectProperty(name);
                    properties.put(name, value);
                }
            }
            catch (JMSException e) {
                throw new RuntimeException(e);
            }
            JmsTrace jmsTrace = new JmsTrace.Builder().withQueue(queueName.orElse(null)).withMessage(unwrapped).build();
            this.jmsInstrumenter.traceIncoming((org.eclipse.microprofile.reactive.messaging.Message<?>)jmsMessage, jmsTrace);
        }
    }

    private static class JmsPublisher
    implements Flow.Publisher<Message>,
    Flow.Subscription {
        private final AtomicLong requests = new AtomicLong();
        private final AtomicReference<Flow.Subscriber<? super Message>> downstream = new AtomicReference();
        private final JmsResourceHolder<JMSConsumer> consumerHolder;
        private final ExecutorService executor;
        private boolean unbounded;

        private JmsPublisher(JmsResourceHolder<JMSConsumer> resourceHolder) {
            this.consumerHolder = resourceHolder;
            this.executor = Executors.newSingleThreadExecutor();
        }

        void close() {
            Flow.Subscriber subscriber = this.downstream.getAndSet(null);
            if (subscriber != null) {
                subscriber.onComplete();
            }
            this.executor.shutdown();
        }

        @Override
        public void subscribe(Flow.Subscriber<? super Message> s) {
            if (this.downstream.compareAndSet(null, s)) {
                s.onSubscribe(this);
            } else {
                Subscriptions.fail(s, (Throwable)JmsExceptions.ex.illegalStateAlreadySubscriber());
            }
        }

        @Override
        public void request(long n) {
            boolean u;
            if (n > 0L && !(u = this.unbounded)) {
                long v = this.add(n);
                if (v == Long.MAX_VALUE) {
                    this.unbounded = true;
                    this.startUnboundedReception();
                } else {
                    this.enqueue(n);
                }
            }
        }

        private void enqueue(long n) {
            int i = 0;
            while ((long)i < n) {
                this.executor.execute(() -> {
                    block3: {
                        try {
                            Message message = null;
                            while (message == null && this.downstream.get() != null) {
                                message = this.consumerHolder.getClient().receive();
                                if (message == null) continue;
                                this.requests.decrementAndGet();
                                this.downstream.get().onNext((Message)message);
                            }
                        }
                        catch (JMSRuntimeException e) {
                            JmsLogging.log.clientClosed();
                            Flow.Subscriber subscriber = this.downstream.getAndSet(null);
                            if (subscriber == null) break block3;
                            subscriber.onError(e);
                        }
                    }
                });
                ++i;
            }
        }

        private void startUnboundedReception() {
            this.consumerHolder.getClient().setMessageListener(m -> this.downstream.get().onNext((Message)m));
        }

        @Override
        public void cancel() {
            this.close();
        }

        long add(long req) {
            long u;
            long v;
            long r;
            do {
                if ((r = this.requests.get()) != Long.MAX_VALUE) continue;
                return Long.MAX_VALUE;
            } while (!this.requests.compareAndSet(r, v = (u = r + req) < 0L ? Long.MAX_VALUE : u));
            return v;
        }
    }
}

