/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.jetstream.simple;

import io.nats.client.Connection;
import io.nats.client.ConsumerContext;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamStatusCheckedException;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.StreamContext;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.examples.jetstream.NatsJsUtils;
import java.io.IOException;

public class FetchMessagesExample {
    private static final String STREAM = "fetch-messages-stream";
    private static final String SUBJECT = "fetch-messages-subject";
    private static final String MESSAGE_TEXT = "fetch-messages";
    private static final String CONSUMER_NAME_PREFIX = "fetch-messages-consumer";
    private static final int MESSAGES = 20;
    private static final int EXPIRES_SECONDS = 2;
    public static String SERVER = "nats://localhost:4222";

    public static void main(String[] args) {
        Options options = Options.builder().server(SERVER).build();
        try (Connection nc = Nats.connect(options);){
            JetStreamManagement jsm = nc.jetStreamManagement();
            JetStream js = nc.jetStream();
            NatsJsUtils.createOrReplaceStream(jsm, STREAM, SUBJECT);
            NatsJsUtils.publishOrExit(js, SUBJECT, MESSAGE_TEXT, 20);
            FetchMessagesExample.simpleFetch(nc, js, "A", 20);
            FetchMessagesExample.simpleFetch(nc, js, "B", 10);
            FetchMessagesExample.simpleFetch(nc, js, "C", 40);
            FetchMessagesExample.simpleFetch(nc, js, "D", 40);
        }
        catch (IOException iOException) {
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private static void simpleFetch(Connection nc, JetStream js, String label, int maxMessages) {
        ConsumerContext consumerContext;
        String consumerName = "fetch-messages-consumer-" + maxMessages + "-messages";
        try {
            StreamContext streamContext = nc.getStreamContext(STREAM);
            consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build());
        }
        catch (JetStreamApiException | IOException e) {
            return;
        }
        FetchConsumeOptions fetchConsumeOptions = FetchConsumeOptions.builder().maxMessages(maxMessages).expiresIn(2000L).build();
        FetchMessagesExample.printExplanation(label, consumerName, maxMessages);
        int receivedMessages = 0;
        long start = System.currentTimeMillis();
        try (FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions);){
            Message msg = consumer.nextMessage();
            while (msg != null) {
                msg.ack();
                if (++receivedMessages == maxMessages) {
                    msg = null;
                    continue;
                }
                msg = consumer.nextMessage();
            }
        }
        catch (JetStreamApiException | JetStreamStatusCheckedException | IOException | InterruptedException e) {
            System.err.println("Exception should be handled properly, just exiting here.");
            System.exit(-1);
        }
        catch (Exception e) {
            System.err.println("Exception should be handled properly, just exiting here.");
            System.exit(-1);
        }
        long elapsed = System.currentTimeMillis() - start;
        FetchMessagesExample.printSummary(receivedMessages, elapsed);
    }

    private static void printSummary(int received, long elapsed) {
        System.out.println("+++ Fetch executed and " + received + " message(s) were received in " + elapsed + "ms\n");
    }

    private static void printExplanation(String label, String name, int maxMessages) {
        System.out.println("--------------------------------------------------------------------------------");
        System.out.println(label + ". " + name);
        switch (label) {
            case "A": 
            case "B": {
                System.out.println("=== Fetch (" + maxMessages + ") is less than or equal to available messages (" + 20 + ")");
                System.out.println("=== nextMessage() will return null when consume is done");
                break;
            }
            case "C": {
                System.out.println("=== Fetch (" + maxMessages + ") is larger than available messages (" + 20 + ")");
                System.out.println("=== FetchConsumeOption \"expires in\" is 2 seconds.");
                System.out.println("=== nextMessage() blocks until expiration when there are no messages available, then returns null.");
                break;
            }
            case "D": {
                System.out.println("=== Fetch (" + maxMessages + ") is larger than available messages (0)");
                System.out.println("=== FetchConsumeOption \"expires in\" is 2 seconds.");
                System.out.println("=== nextMessage() blocks until expiration when there are no messages available, then returns null.");
            }
        }
    }
}

