/*
 * Decompiled with CFR 0.152.
 */
package dev.lydtech.component.framework.client.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KafkaClient {
    private static final Logger log = LoggerFactory.getLogger(KafkaClient.class);
    protected String brokerUrl;
    private static KafkaClient instance;
    private KafkaProducer defaultProducer;

    private KafkaClient() {
        String kafkaHost = Optional.ofNullable(System.getProperty("docker.host")).orElse("localhost");
        String kafkaPort = Optional.ofNullable(System.getProperty("kafka.mapped.port")).orElseThrow(() -> new RuntimeException("kafka.mapped.port property not found"));
        this.brokerUrl = "http://" + kafkaHost + ":" + kafkaPort;
        log.info("Kafka broker URL is: " + this.brokerUrl);
        this.defaultProducer = this.createProducer();
    }

    public static synchronized KafkaClient getInstance() {
        if (instance == null) {
            instance = new KafkaClient();
        }
        return instance;
    }

    protected String getBrokerUrl() {
        return this.brokerUrl;
    }

    public Consumer createConsumer(String groupId, String topic) {
        Properties config = new Properties();
        config.put("bootstrap.servers", this.brokerUrl);
        config.put("group.id", groupId + "-" + topic);
        config.put("key.deserializer", StringDeserializer.class);
        config.put("value.deserializer", StringDeserializer.class);
        config.put("auto.offset.reset", "latest");
        config.put("metadata.max.age.ms", (Object)1000);
        KafkaConsumer consumer = new KafkaConsumer(config);
        consumer.subscribe(Collections.singletonList(topic));
        return consumer;
    }

    public KafkaProducer<Long, String> createProducer() {
        return this.createProducer(null);
    }

    public KafkaProducer<Long, String> createProducer(Properties additionalConfig) {
        Properties config = new Properties();
        config.put("bootstrap.servers", this.brokerUrl);
        config.put("key.serializer", StringSerializer.class);
        config.put("value.serializer", StringSerializer.class);
        if (additionalConfig != null && !additionalConfig.isEmpty()) {
            config.putAll((Map<?, ?>)additionalConfig);
        }
        return new KafkaProducer(config);
    }

    public RecordMetadata sendMessage(String topic, String key, Object payload) throws Exception {
        return this.sendMessage((Producer)this.defaultProducer, topic, key, payload, null);
    }

    public RecordMetadata sendMessage(Producer producer, String topic, String key, Object payload) throws Exception {
        return this.sendMessage(producer, topic, key, payload, null);
    }

    public RecordMetadata sendMessage(String topic, String key, Object payload, Map<String, String> headers) throws Exception {
        return this.sendMessage((Producer)this.defaultProducer, topic, key, payload, headers);
    }

    public RecordMetadata sendMessage(Producer producer, String topic, String key, Object payload, Map<String, String> headers) throws Exception {
        ArrayList recordHeaders = new ArrayList();
        if (headers != null && headers.size() > 0) {
            headers.forEach((headerKey, headerValue) -> recordHeaders.add(new RecordHeader(headerKey, headerValue != null ? headerValue.getBytes() : null)));
        }
        ProducerRecord record = new ProducerRecord(topic, null, (Object)key, payload, recordHeaders);
        RecordMetadata metadata = (RecordMetadata)producer.send(record).get();
        log.debug(String.format("Sent record(key=%s value=%s) meta(topic=%s, partition=%d, offset=%d)", record.key(), record.value(), metadata.topic(), metadata.partition(), metadata.offset()));
        return metadata;
    }

    public Future<RecordMetadata> sendMessageAsync(String topic, String key, Object payload) {
        return this.sendMessageAsync((Producer)this.defaultProducer, topic, key, payload, null);
    }

    public Future<RecordMetadata> sendMessageAsync(Producer producer, String topic, String key, Object payload) {
        return this.sendMessageAsync(producer, topic, key, payload, null);
    }

    public Future<RecordMetadata> sendMessageAsync(String topic, String key, Object payload, Map<String, String> headers) {
        return this.sendMessageAsync((Producer)this.defaultProducer, topic, key, payload, headers);
    }

    public Future<RecordMetadata> sendMessageAsync(Producer producer, String topic, String key, Object payload, Map<String, String> headers) {
        ArrayList recordHeaders = new ArrayList();
        if (headers != null && headers.size() > 0) {
            headers.forEach((headerKey, headerValue) -> recordHeaders.add(new RecordHeader(headerKey, headerValue != null ? headerValue.getBytes() : null)));
        }
        ProducerRecord record = new ProducerRecord(topic, null, (Object)key, payload, recordHeaders);
        return producer.send(record);
    }

    public <T> List<ConsumerRecord<String, T>> consumeAndAssert(String testName, Consumer consumer, int expectedEventCount, int furtherPolls) throws Exception {
        AtomicInteger totalReceivedEvents = new AtomicInteger();
        AtomicInteger totalExtraPolls = new AtomicInteger(-1);
        AtomicInteger pollCount = new AtomicInteger();
        ArrayList events = new ArrayList();
        Awaitility.await().atMost(1L, TimeUnit.MINUTES).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100L));
            consumerRecords.forEach(record -> {
                log.info(testName + " - received: " + record.value());
                totalReceivedEvents.incrementAndGet();
                events.add((ConsumerRecord)record);
            });
            if (totalReceivedEvents.get() == expectedEventCount) {
                totalExtraPolls.incrementAndGet();
            }
            pollCount.getAndIncrement();
            log.info(testName + " - poll count: " + pollCount.get() + " - received count: " + totalReceivedEvents.get());
            return totalReceivedEvents.get() == expectedEventCount && totalExtraPolls.get() == furtherPolls;
        });
        return events;
    }
}

