/*
 * Decompiled with CFR 0.152.
 */
package io.streamzi.cloudevents.kafka;

import io.streamzi.cloudevents.CloudEvent;
import io.streamzi.cloudevents.cdi.EventTypeQualifier;
import java.lang.annotation.Annotation;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import org.aerogear.kafka.cdi.annotation.Consumer;
import org.aerogear.kafka.cdi.annotation.KafkaConfig;

@KafkaConfig(bootstrapServers="#{KAFKA_SERVICE_HOST}:#{KAFKA_SERVICE_PORT}")
public class KafkaCloudEventConsumer {
    private static final Logger logger = Logger.getLogger(KafkaCloudEventConsumer.class.getName());
    @Inject
    private Event<CloudEvent<?>> cloudEventSource;

    @Consumer(topics={"#{CLOUD_EVENT_BRIDGE_TOPIC}"}, groupId="jCloudEvent_group")
    public void eventBridge(CloudEvent<?> ce) {
        String eventType = ce.getEventType();
        logger.info("Processing CloudEvent type: " + eventType);
        Optional extensionMap = ce.getExtensions();
        ((Map)extensionMap.get()).put("processor", "kafka-cdi");
        try {
            this.cloudEventSource.select(new Annotation[]{new EventTypeQualifier(eventType)}).fire(ce);
            logger.info("Dispatched to CDI event system");
        }
        catch (Exception e) {
            logger.log(Level.SEVERE, "Error submitting CDI event", e);
        }
    }
}

