/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.addon.cloudevents.quarkus;

import io.quarkus.runtime.annotations.RegisterForReflection;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.reactive.messaging.ChannelRegistar;
import io.smallrye.reactive.messaging.DefaultMediatorConfiguration;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.extension.MediatorManager;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.enterprise.inject.spi.BeanManager;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.addon.cloudevents.quarkus.ChannelResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@RegisterForReflection
public class QuarkusMultiCloudEventPublisher
implements ChannelRegistar {
    private static final Logger LOGGER = LoggerFactory.getLogger(QuarkusMultiCloudEventPublisher.class);
    @Inject
    private MediatorManager mediatorManager;
    @Inject
    private BeanManager beanManager;
    @Inject
    private ChannelResolver channelResolver;
    private BroadcastProcessor<String> processor;

    @PostConstruct
    private void init() {
        this.processor = BroadcastProcessor.create();
    }

    private Collection<MediatorConfiguration> mediatorConf(final Collection<String> channels) throws NoSuchMethodException {
        return Collections.singletonList(new DefaultMediatorConfiguration(QuarkusMultiCloudEventPublisher.class.getMethod("produce", Message.class), this.beanManager.resolve(this.beanManager.getBeans(QuarkusMultiCloudEventPublisher.class, new Annotation[0]))){

            public List<String> getIncoming() {
                return new ArrayList<String>(channels);
            }

            public Shape shape() {
                return Shape.SUBSCRIBER;
            }

            public MediatorConfiguration.Consumption consumption() {
                return MediatorConfiguration.Consumption.MESSAGE;
            }
        });
    }

    @Produces
    @ApplicationScoped
    @Named(value="kogito_event_publisher")
    public Multi<String> producerFactory() {
        return this.processor;
    }

    public CompletionStage<Void> produce(Message<String> message) {
        LOGGER.debug("Received message from channel {}: {}", (Object)"kogito_incoming_stream", message);
        this.processor.onNext((Object)((String)message.getPayload()));
        return message.ack().exceptionally(e -> {
            LOGGER.error("Failed to ack message", e);
            return null;
        });
    }

    public void initialize() {
        try {
            Collection<String> inputChannels = this.channelResolver.getInputChannels();
            if (!inputChannels.isEmpty()) {
                this.mediatorManager.addAnalyzed(this.mediatorConf(inputChannels));
            }
        }
        catch (NoSuchMethodException e) {
            throw new IllegalStateException(e);
        }
    }
}

