/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.events.spring;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collection;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaEventPublisher
implements EventPublisher {
    private static final String PI_TOPIC_NAME = "kogito-processinstances-events";
    private static final String UI_TOPIC_NAME = "kogito-usertaskinstances-events";
    private static final String VI_TOPIC_NAME = "kogito-variables-events";
    private static final Logger logger = LoggerFactory.getLogger(KafkaEventPublisher.class);
    @Autowired
    private ObjectMapper json;
    @Autowired
    private Environment env;
    @Autowired
    private KafkaTemplate<String, String> eventsEmitter;
    @Value(value="${kogito.events.processinstances.enabled:true}")
    private boolean processInstancesEvents;
    @Value(value="${kogito.events.usertasks.enabled:true}")
    private boolean userTasksEvents;
    @Value(value="${kogito.events.variables.enabled:true}")
    private boolean variablesEvents;

    public void publish(DataEvent<?> event) {
        switch (event.getType()) {
            case "ProcessInstanceEvent": {
                if (!this.processInstancesEvents) break;
                this.publishToTopic(event, PI_TOPIC_NAME);
                break;
            }
            case "UserTaskInstanceEvent": {
                if (!this.userTasksEvents) break;
                this.publishToTopic(event, UI_TOPIC_NAME);
                break;
            }
            case "VariableInstanceEvent": {
                if (!this.variablesEvents) break;
                this.publishToTopic(event, VI_TOPIC_NAME);
                break;
            }
            default: {
                logger.debug("Unknown type of event '{}', ignoring for this publisher", (Object)event.getType());
            }
        }
    }

    public void publish(Collection<DataEvent<?>> events) {
        for (DataEvent<?> event : events) {
            this.publish(event);
        }
    }

    protected void publishToTopic(DataEvent<?> event, String topic) {
        logger.debug("About to publish event {} to Kafka topic {}", event, (Object)topic);
        try {
            String eventString = this.json.writeValueAsString(event);
            logger.debug("Event payload '{}'", (Object)eventString);
            this.eventsEmitter.send(this.env.getProperty("kogito.addon.events.process.kafka." + topic + ".topic", topic), (Object)eventString);
            logger.debug("Successfully published event {} to topic {}", event, (Object)topic);
        }
        catch (Exception e) {
            logger.error("Error while publishing event to Kafka topic {} for event {}", new Object[]{topic, event, e});
        }
    }
}

