/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.jetstream.simple;

import io.nats.client.Connection;
import io.nats.client.ConsumerContext;
import io.nats.client.FetchConsumer;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.StreamContext;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StorageType;
import io.nats.examples.jetstream.NatsJsUtils;
import io.nats.examples.jetstream.ResilientPublisher;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class FetchResilientExample
implements Runnable {
    static final String STREAM = "fetch-resilient-stream";
    static final String SUBJECT = "fetch-resilient-subject";
    static final String CONSUMER_NAME = "fetch-resilient-consumer";
    static final String MESSAGE_PREFIX = "fetch-resilient-message";
    static final int FETCH_SIZE = 500;
    static final int PUBLISH_DELAY = 100;
    static final long CONSUME_REPORT_FREQUENCY = 2000L;
    static final long PUB_REPORT_FREQUENCY = 5000L;
    static final long STREAM_REPORT_FREQUENCY = 30000L;
    static String SERVER = "nats://localhost:4222";
    public final AtomicBoolean keepGoing = new AtomicBoolean(true);
    private final ConsumerContext cc;
    private long reportAt = 0L;

    public static void main(String[] args) {
        Options options = Options.builder().server(SERVER).connectionListener((c, t) -> NatsJsUtils.report("Connection: " + c.getServerInfo().getPort() + " " + (Object)((Object)t))).build();
        try (Connection nc = Nats.connect(options);){
            JetStreamManagement jsm = nc.jetStreamManagement();
            NatsJsUtils.createOrReplaceStream(jsm, STREAM, StorageType.File, SUBJECT);
            Thread streamReportingThread = NatsJsUtils.getStreamReportingThread(jsm, STREAM, 30000L);
            streamReportingThread.start();
            ResilientPublisher rp = new ResilientPublisher(nc, jsm, STREAM, SUBJECT).basicDataPrefix(MESSAGE_PREFIX).delay(100L).reportFrequency(5000L);
            Thread pubThread = new Thread(rp);
            pubThread.start();
            JetStream js = jsm.jetStream();
            StreamContext sc = js.getStreamContext(STREAM);
            ConsumerContext cc = sc.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).filterSubject(SUBJECT).build());
            FetchResilientExample example = new FetchResilientExample(cc);
            Thread consumeThread = new Thread(example);
            consumeThread.start();
            consumeThread.join();
        }
        catch (IOException iOException) {
        }
        catch (InterruptedException interruptedException) {
        }
        catch (JetStreamApiException jetStreamApiException) {
            // empty catch block
        }
    }

    public FetchResilientExample(ConsumerContext cc) {
        this.cc = cc;
    }

    public void stop() {
        this.keepGoing.set(false);
    }

    @Override
    public void run() {
        long lastReadStreamSeq = 0L;
        while (this.keepGoing.get()) {
            try (FetchConsumer fc2 = this.cc.fetchMessages(500);){
                Message m = fc2.nextMessage();
                while (m != null) {
                    lastReadStreamSeq = m.metaData().streamSequence();
                    if (System.currentTimeMillis() > this.reportAt) {
                        NatsJsUtils.report("Last Read Sequence: " + lastReadStreamSeq);
                        this.reportAt = System.currentTimeMillis() + 2000L;
                    }
                    m.ack();
                    m = fc2.nextMessage();
                }
            }
            catch (Exception fc2) {
                // empty catch block
            }
            try {
                if (System.currentTimeMillis() > this.reportAt) {
                    NatsJsUtils.report("Last Read Sequence: " + lastReadStreamSeq);
                    this.reportAt = System.currentTimeMillis() + 2000L;
                }
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

