/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.contribs.queue.nats;

import com.netflix.conductor.contribs.queue.nats.JsmMessage;
import com.netflix.conductor.contribs.queue.nats.NatsException;
import com.netflix.conductor.contribs.queue.nats.config.JetStreamProperties;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;

public class JetStreamObservableQueue
implements ObservableQueue {
    private static final Logger LOG = LoggerFactory.getLogger(JetStreamObservableQueue.class);
    private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue();
    private final Lock mu = new ReentrantLock();
    private final String queueType;
    private final String subject;
    private final JetStreamProperties properties;
    private final Scheduler scheduler;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private Connection nc;
    private JetStreamSubscription sub;
    private Observable<Long> interval;

    public JetStreamObservableQueue(JetStreamProperties properties, String queueType, String subject, Scheduler scheduler) {
        LOG.debug("JSM obs queue create, qtype={}, quri={}", (Object)queueType, (Object)subject);
        this.queueType = queueType;
        this.subject = subject;
        this.properties = properties;
        this.scheduler = scheduler;
    }

    public Observable<Message> observe() {
        return Observable.create(this.getOnSubscribe());
    }

    private Observable.OnSubscribe<Message> getOnSubscribe() {
        return subscriber -> {
            this.interval = Observable.interval((long)this.properties.getPollTimeDuration().toMillis(), (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)this.scheduler);
            this.interval.flatMap(x -> {
                if (!this.isRunning()) {
                    LOG.debug("Component stopped, skip listening for messages from JSM Queue '{}'", (Object)this.subject);
                    return Observable.from(Collections.emptyList());
                }
                ArrayList available = new ArrayList();
                this.messages.drainTo(available);
                if (!available.isEmpty()) {
                    LOG.debug("Processing JSM queue '{}' batch messages count={}", (Object)this.subject, (Object)available.size());
                }
                return Observable.from(available);
            }).subscribe(arg_0 -> ((Subscriber)subscriber).onNext(arg_0), arg_0 -> ((Subscriber)subscriber).onError(arg_0));
        };
    }

    public String getType() {
        return this.queueType;
    }

    public String getName() {
        return this.subject;
    }

    public String getURI() {
        return this.getName();
    }

    public List<String> ack(List<Message> messages) {
        messages.forEach(m -> ((JsmMessage)((Object)m)).getJsmMsg().ack());
        return Collections.emptyList();
    }

    public void publish(List<Message> messages) {
        try (Connection conn = Nats.connect((String)this.properties.getUrl());){
            JetStream js = conn.jetStream();
            for (Message msg : messages) {
                js.publish(this.subject, msg.getPayload().getBytes());
            }
        }
        catch (JetStreamApiException | IOException e) {
            throw new NatsException("Failed to publish to jsm", e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new NatsException("Failed to publish to jsm", e);
        }
    }

    public void setUnackTimeout(Message message, long unackTimeout) {
    }

    public long size() {
        try {
            return this.sub.getConsumerInfo().getNumPending();
        }
        catch (JetStreamApiException | IOException e) {
            LOG.warn("Failed to get stream '{}' info", (Object)this.subject);
            return 0L;
        }
    }

    public void start() {
        this.mu.lock();
        try {
            this.natsConnect();
        }
        finally {
            this.mu.unlock();
        }
    }

    public void stop() {
        this.interval.unsubscribeOn(this.scheduler);
        try {
            if (this.nc != null) {
                this.nc.close();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Failed to close Nats connection", (Throwable)e);
        }
        this.running.set(false);
    }

    public boolean isRunning() {
        return this.running.get();
    }

    private void natsConnect() {
        if (this.running.get()) {
            return;
        }
        LOG.info("Starting JSM observable, name={}", (Object)this.subject);
        try {
            Nats.connectAsynchronously((Options)new Options.Builder().connectionListener((conn, type) -> {
                LOG.info("Connection to JSM updated: {}", (Object)type);
                this.nc = conn;
                this.subscribeOnce(conn, type);
            }).server(this.properties.getUrl()).maxReconnects(-1).build(), (boolean)true);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new NatsException("Failed to connect to JSM", e);
        }
    }

    private void createStream(Connection nc) {
        JetStreamManagement jsm;
        try {
            jsm = nc.jetStreamManagement();
        }
        catch (IOException e) {
            throw new NatsException("Failed to get jsm management", e);
        }
        StreamConfiguration streamConfig = StreamConfiguration.builder().name(this.subject).retentionPolicy(RetentionPolicy.WorkQueue).storageType(StorageType.get((String)this.properties.getStreamStorageType())).build();
        try {
            StreamInfo streamInfo = jsm.addStream(streamConfig);
            LOG.debug("Create stream, info: {}", (Object)streamInfo);
        }
        catch (JetStreamApiException | IOException e) {
            LOG.error("Failed to add stream: " + streamConfig, e);
        }
    }

    private void subscribeOnce(Connection nc, ConnectionListener.Events type) {
        if (type.equals((Object)ConnectionListener.Events.CONNECTED) || type.equals((Object)ConnectionListener.Events.RECONNECTED)) {
            this.createStream(nc);
            this.subscribe(nc);
        }
    }

    private void subscribe(Connection nc) {
        try {
            JetStream js = nc.jetStream();
            PushSubscribeOptions pso = ((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().durable(this.properties.getDurableName())).build();
            LOG.debug("Subscribing jsm, subject={}, options={}", (Object)this.subject, (Object)pso);
            this.sub = js.subscribe(this.subject, this.properties.getDurableName(), nc.createDispatcher(), msg -> {
                JsmMessage message = new JsmMessage();
                message.setJsmMsg(msg);
                message.setId(msg.getSID());
                message.setPayload(new String(msg.getData()));
                this.messages.add(message);
            }, false, pso);
            LOG.debug("Subscribed successfully {}", (Object)this.sub.getConsumerInfo());
            this.running.set(true);
        }
        catch (JetStreamApiException | IOException e) {
            LOG.error("Failed to subscribe", e);
        }
    }
}

