/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.events.process;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.events.config.EventsRuntimeConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class ReactiveMessagingEventPublisher
implements EventPublisher {
    private static final Logger logger = LoggerFactory.getLogger(ReactiveMessagingEventPublisher.class);
    @Inject
    ObjectMapper json;
    @Inject
    @Channel(value="kogito-processinstances-events")
    @OnOverflow(value=OnOverflow.Strategy.UNBOUNDED_BUFFER)
    MutinyEmitter<String> processInstancesEventsEmitter;
    private BiConsumer<MutinyEmitter<String>, Message<String>> processInstanceConsumer;
    @Inject
    @Channel(value="kogito-processdefinitions-events")
    MutinyEmitter<String> processDefinitionEventsEmitter;
    private BiConsumer<MutinyEmitter<String>, Message<String>> processDefinitionConsumer;
    @Inject
    @Channel(value="kogito-usertaskinstances-events")
    MutinyEmitter<String> userTasksEventsEmitter;
    private BiConsumer<MutinyEmitter<String>, Message<String>> userTaskConsumer;
    @Inject
    EventsRuntimeConfig eventsRuntimeConfig;
    @Inject
    Instance<MessageDecoratorProvider> decoratorProviderInstance;
    private MessageDecoratorProvider decoratorProvider;

    @PostConstruct
    public void init() {
        this.decoratorProvider = this.decoratorProviderInstance.isResolvable() ? (MessageDecoratorProvider)this.decoratorProviderInstance.get() : null;
        this.processInstanceConsumer = this.eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
        this.processDefinitionConsumer = this.eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
        this.userTaskConsumer = this.eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
    }

    public void publish(DataEvent<?> event) {
        switch (event.getType()) {
            case "ProcessDefinitionEvent": {
                if (!this.eventsRuntimeConfig.isProcessDefinitionEventsEnabled()) break;
                this.publishToTopic(this.processDefinitionConsumer, event, this.processDefinitionEventsEmitter, "kogito-processdefinitions-events");
                break;
            }
            case "ProcessInstanceErrorDataEvent": 
            case "ProcessInstanceNodeDataEvent": 
            case "ProcessInstanceSLADataEvent": 
            case "ProcessInstanceStateDataEvent": 
            case "ProcessInstanceVariableDataEvent": {
                if (!this.eventsRuntimeConfig.isProcessInstancesEventsEnabled()) break;
                this.publishToTopic(this.processInstanceConsumer, event, this.processInstancesEventsEmitter, "kogito-processinstances-events");
                break;
            }
            case "UserTaskInstanceAssignmentDataEvent": 
            case "UserTaskInstanceAttachmentDataEvent": 
            case "UserTaskInstanceCommentDataEvent": 
            case "UserTaskInstanceDeadlineDataEvent": 
            case "UserTaskInstanceStateDataEvent": 
            case "UserTaskInstanceVariableDataEvent": {
                if (!this.eventsRuntimeConfig.isUserTasksEventsEnabled()) break;
                this.publishToTopic(this.userTaskConsumer, event, this.userTasksEventsEmitter, "kogito-usertaskinstances-events");
                break;
            }
            default: {
                logger.debug("Unknown type of event '{}', ignoring for this publisher", (Object)event.getType());
            }
        }
    }

    public void publish(Collection<DataEvent<?>> events) {
        for (DataEvent<?> event : events) {
            this.publish(event);
        }
    }

    protected void publishToTopic(BiConsumer<MutinyEmitter<String>, Message<String>> consumer, DataEvent<?> event, MutinyEmitter<String> emitter, String topic) {
        logger.debug("About to publish event {} to topic {}", event, (Object)topic);
        Message<String> message = null;
        try {
            String eventString = this.json.writeValueAsString(event);
            logger.debug("Event payload '{}'", (Object)eventString);
            message = this.decorateMessage((Message<String>)ContextAwareMessage.of((Object)eventString));
        }
        catch (Exception e) {
            logger.error("Error while creating event to topic {} for event {}", (Object)topic, event);
        }
        if (message != null) {
            consumer.accept(emitter, message);
        }
    }

    protected CompletionStage<Void> onAck(Message<String> message) {
        logger.debug("Successfully published message {}", message.getPayload());
        return CompletableFuture.completedFuture(null);
    }

    protected CompletionStage<Void> onNack(Throwable reason, Message<String> message) {
        logger.error("Error while publishing message {}", message, (Object)reason);
        return CompletableFuture.completedFuture(null);
    }

    protected Message<String> decorateMessage(Message<String> message) {
        return this.decoratorProvider != null ? this.decoratorProvider.decorate(message) : message;
    }

    private class BlockingMessageEmitter
    implements BiConsumer<MutinyEmitter<String>, Message<String>> {
        private BlockingMessageEmitter() {
        }

        @Override
        public void accept(MutinyEmitter<String> emitter, Message<String> message) {
            emitter.sendMessageAndAwait(message);
            logger.debug("Successfully published message {}", message.getPayload());
        }
    }

    private class ReactiveMessageEmitter
    implements BiConsumer<MutinyEmitter<String>, Message<String>> {
        private ReactiveMessageEmitter() {
        }

        @Override
        public void accept(MutinyEmitter<String> emitter, Message<String> message) {
            emitter.sendMessageAndForget(message.withAck(() -> ReactiveMessagingEventPublisher.this.onAck(message)).withNack(reason -> ReactiveMessagingEventPublisher.this.onNack((Throwable)reason, message)));
        }
    }
}

