/*
 * Decompiled with CFR 0.152.
 */
package org.spin.eca56.util;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.spin.eca56.util.MapDeserializer;

public class KafkaTest {
    public static void main(String[] args) throws InterruptedException {
        String host = "127.0.0.1";
        String port = "29092";
        String topic = "your_topic";
        if (args != null && args.length > 0) {
            host = args[0];
            port = args[1];
            topic = args[2];
        }
        String completeUrl = host + ":" + port;
        while (true) {
            int intents;
            if ((intents = 4) == 0) {
                intents = 100;
            }
            HashMap<String, Object> consumerConfigs = new HashMap<String, Object>();
            consumerConfigs.put("client.id", "erp2");
            consumerConfigs.put("bootstrap.servers", completeUrl);
            consumerConfigs.put("group.id", "foo01");
            consumerConfigs.put("key.deserializer", StringDeserializer.class.getName());
            consumerConfigs.put("value.deserializer", MapDeserializer.class.getName());
            try (KafkaConsumer consumer = new KafkaConsumer(consumerConfigs);){
                consumer.subscribe(Arrays.asList(topic));
                AtomicInteger iterate = new AtomicInteger(0);
                AtomicInteger errors = new AtomicInteger();
                while (iterate.incrementAndGet() < intents) {
                    System.out.println("Intent: " + iterate.get());
                    ConsumerRecords records = consumer.poll(Duration.ofSeconds(10L));
                    records.forEach(record -> {
                        try {
                            System.out.println(record.value());
                        }
                        catch (Exception e) {
                            errors.addAndGet(1);
                        }
                    });
                    consumer.commitSync();
                }
                consumer.unsubscribe();
                consumer.close(Duration.ofSeconds(1L));
            }
            System.out.println("Waiting......");
            Thread.sleep(10000L);
        }
    }
}

