/*
 * Decompiled with CFR 0.152.
 */
package io.nats.cloud.stream.binder;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Subscription;
import io.nats.cloud.stream.binder.NatsConsumerDestination;
import io.nats.cloud.stream.binder.NatsMessageHandler;
import java.time.Duration;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.Lifecycle;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.messaging.support.GenericMessage;

public class NatsMessageSource
extends AbstractMessageSource<Object>
implements Lifecycle {
    private static final Log logger = LogFactory.getLog(NatsMessageHandler.class);
    private NatsConsumerDestination destination;
    private Connection connection;
    private Subscription sub;

    public NatsMessageSource(NatsConsumerDestination destination, Connection nc) {
        this.destination = destination;
        this.connection = nc;
    }

    protected Object doReceive() {
        if (this.sub == null) {
            return null;
        }
        try {
            Message m = this.sub.nextMessage(Duration.ZERO);
            if (m != null) {
                HashMap<String, String> headers = new HashMap<String, String>();
                headers.put("subject", m.getSubject());
                headers.put("replyChannel", m.getReplyTo());
                GenericMessage gm = new GenericMessage((Object)m.getData(), headers);
                return gm;
            }
        }
        catch (InterruptedException exp) {
            logger.info((Object)"wait for message interrupted");
        }
        return null;
    }

    public boolean isRunning() {
        return this.sub != null;
    }

    public void start() {
        if (this.sub != null) {
            return;
        }
        String sub = this.destination.getSubject();
        String queue = this.destination.getQueueGroup();
        this.sub = queue != null && queue.length() > 0 ? this.connection.subscribe(sub, queue) : this.connection.subscribe(sub);
    }

    public void stop() {
        if (this.sub == null) {
            return;
        }
        this.sub.unsubscribe();
        this.sub = null;
    }

    public String getComponentType() {
        return "nats:message-source";
    }
}

