/*
 * Decompiled with CFR 0.152.
 */
package com.clivern.kafka;

import com.clivern.kafka.Configs;
import com.clivern.kafka.FailureCallbackInterface;
import com.clivern.kafka.HandlerCallbackInterface;
import com.clivern.kafka.SuccessCallbackInterface;
import com.clivern.kafka.exception.MissingHandler;
import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {
    private Configs configs;
    private KafkaConsumer<String, String> consumer;
    private HandlerCallbackInterface handlerCallback;
    private SuccessCallbackInterface onSuccessCallback;
    private FailureCallbackInterface onFailureCallback;

    public Consumer(Configs configs) {
        this.configs = configs;
        this.consumer = new KafkaConsumer(this.configs.getProperties());
    }

    public Consumer handler(HandlerCallbackInterface callback) {
        this.handlerCallback = callback;
        return this;
    }

    public Consumer subscribe(String topic) {
        this.consumer.subscribe(Collections.singleton(topic));
        return this;
    }

    public Consumer onSuccess(SuccessCallbackInterface callback) {
        this.onSuccessCallback = callback;
        return this;
    }

    public Consumer onFailure(FailureCallbackInterface callback) {
        this.onFailureCallback = callback;
        return this;
    }

    public void run() throws MissingHandler {
        if (this.handlerCallback == null) {
            throw new MissingHandler("Error! HandlerCallback is missing");
        }
        while (true) {
            ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
            for (ConsumerRecord record : records) {
                try {
                    this.handlerCallback.trigger(record);
                    if (this.onSuccessCallback == null) continue;
                    this.onSuccessCallback.trigger(record);
                }
                catch (Exception e) {
                    if (this.onFailureCallback == null) continue;
                    this.onFailureCallback.trigger(record, e);
                }
            }
            this.consumer.commitAsync();
        }
    }
}

