/*
 * Decompiled with CFR 0.152.
 */
package io.streamzi.openshift.dataflow.container.kafka;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.streamzi.cloudevents.CloudEvent;
import io.streamzi.cloudevents.impl.CloudEventImpl;
import io.streamzi.openshift.dataflow.container.CloudEventInput;
import io.streamzi.openshift.dataflow.container.config.EnvironmentResolver;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaCloudEventInputImpl
extends CloudEventInput
implements Runnable {
    private static final Logger logger = Logger.getLogger(KafkaCloudEventInputImpl.class.getName());
    private ObjectMapper mapper;
    private Consumer<String, String> consumer;
    private String topicName;
    private String bootstrapServers = EnvironmentResolver.get("STREAMZI_KAFKA_BOOTSTRAP_SERVER");
    private volatile boolean stopInput = false;
    private Thread pollThread;

    public KafkaCloudEventInputImpl(Object consumerObject, Method consumerMethod) {
        super(consumerObject, consumerMethod);
        logger.info("Kafka broker defined at: " + this.bootstrapServers);
        this.topicName = EnvironmentResolver.get(this.inputName);
        logger.info("Input will connect to topic: " + this.topicName);
        this.mapper = new ObjectMapper();
        this.mapper.registerModule((Module)new Jdk8Module());
        this.mapper.registerModule((Module)new JavaTimeModule());
        this.mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
        this.mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
    }

    @Override
    public void startInput() {
        logger.info("Starting input: " + this.inputName);
        if (this.consumer == null) {
            this.consumer = this.createConsumer();
            this.pollThread = new Thread(this);
            this.pollThread.setDaemon(true);
            this.pollThread.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            while (!this.stopInput) {
                ConsumerRecords records = this.consumer.poll(100L);
                for (ConsumerRecord r : records) {
                    try {
                        logger.info("Read: " + (String)r.value());
                        CloudEvent evt = (CloudEvent)this.mapper.readValue((String)r.value(), CloudEventImpl.class);
                        this.consumerMethod.invoke(this.consumerObject, evt);
                    }
                    catch (Exception e) {
                        logger.warning("Error running consumer method: " + e.getMessage());
                    }
                }
            }
        }
        catch (WakeupException we) {
            if (!this.stopInput) {
                logger.info("Wakeup exception");
                throw we;
            }
        }
        finally {
            this.consumer.close();
        }
    }

    private Consumer<String, String> createConsumer() {
        if (this.topicName != null && this.bootstrapServers != null) {
            logger.info("Attaching to topic: " + this.topicName);
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.bootstrapServers);
            properties.put("group.id", this.processorUuid);
            properties.put("key.deserializer", StringDeserializer.class);
            properties.put("value.deserializer", StringDeserializer.class);
            properties.put("auto.commit.interval.ms", (Object)1000);
            KafkaConsumer c = new KafkaConsumer(properties);
            c.subscribe(Collections.singletonList(this.topicName));
            return c;
        }
        logger.log(Level.SEVERE, "Missing configuration data");
        return null;
    }

    @Override
    public void stopInput() {
        this.stopInput = true;
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }
}

