/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.addon.cloudevents.quarkus;

import io.quarkus.runtime.Startup;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import java.util.concurrent.CompletionStage;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Named;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Startup
@ApplicationScoped
public class QuarkusCloudEventPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(QuarkusCloudEventPublisher.class);
    protected BroadcastProcessor<String> processor = BroadcastProcessor.create();

    @Produces
    @ApplicationScoped
    @Named(value="kogito_event_publisher")
    public Multi<String> producerFactory() {
        return this.processor;
    }

    @Incoming(value="kogito_incoming_stream")
    public CompletionStage<Void> onEvent(Message<String> message) {
        LOGGER.debug("Received message from channel {}: {}", (Object)"kogito_incoming_stream", message);
        return message.ack().exceptionally(e -> {
            LOGGER.error("Failed to ack message", e);
            return null;
        }).thenApply(r -> {
            this.produce((String)message.getPayload());
            return null;
        });
    }

    public void produce(String message) {
        LOGGER.debug("Producing message to internal bus: {}", (Object)message);
        this.processor.onNext((Object)message);
    }
}

