/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.events.rm;

import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.Stream;
import java.util.Collection;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class ReactiveMessagingEventPublisher
implements EventPublisher {
    private static final String PI_TOPIC_NAME = "kogito-processinstances-events";
    private static final String UI_TOPIC_NAME = "kogito-usertaskinstances-events";
    private static final Logger logger = LoggerFactory.getLogger(ReactiveMessagingEventPublisher.class);
    private Jsonb jsonb = JsonbBuilder.create();
    @Inject
    @Stream(value="kogito-processinstances-events")
    Emitter<String> processInstancesEventsEmitter;
    @Inject
    @Stream(value="kogito-usertaskinstances-events")
    Emitter<String> userTasksEventsEmitter;
    @Inject
    @ConfigProperty(name="kogito.events.processinstances.enabled", defaultValue="true")
    Boolean processInstancesEvents;
    @Inject
    @ConfigProperty(name="kogito.events.usertasks.enabled", defaultValue="true")
    Boolean userTasksEvents;

    public void publish(DataEvent<?> event) {
        if (event.getType().equals("ProcessInstanceEvent") && this.processInstancesEvents.booleanValue()) {
            this.publishToTopic(event, this.processInstancesEventsEmitter, PI_TOPIC_NAME);
        } else if (event.getType().equals("UserTaskInstanceEvent") && this.userTasksEvents.booleanValue()) {
            this.publishToTopic(event, this.userTasksEventsEmitter, UI_TOPIC_NAME);
        } else {
            logger.warn("Unknown type of event '{}', ignoring", (Object)event.getType());
        }
    }

    public void publish(Collection<DataEvent<?>> events) {
        for (DataEvent<?> event : events) {
            this.publish(event);
        }
    }

    protected void publishToTopic(DataEvent<?> event, Emitter<String> emitter, String topic) {
        if (emitter.isRequested()) {
            logger.debug("Emitter {} is not ready to send messages", (Object)topic);
        }
        logger.debug("About to publish event {} to topic {}", event, (Object)topic);
        try {
            String eventString = this.jsonb.toJson(event);
            logger.debug("Event payload '{}'", (Object)eventString);
            emitter.send((Object)eventString);
            logger.debug("Successfully published event {} to topic {}", event, (Object)topic);
        }
        catch (Exception e) {
            logger.error("Error while publishing event to topic {} for event {}", new Object[]{topic, event, e});
        }
    }
}

