/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.extension;

import io.smallrye.reactive.messaging.AbstractMediator;
import io.smallrye.reactive.messaging.ChannelRegistar;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.MediatorFactory;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.WeavingException;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.extension.CollectedMediatorMetadata;
import io.smallrye.reactive.messaging.extension.EmitterImpl;
import io.smallrye.reactive.messaging.extension.LazySource;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.spi.AnnotatedType;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.DeploymentException;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class MediatorManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(MediatorManager.class);
    public static final String STRICT_MODE_PROPERTY = "smallrye-messaging-strict-binding";
    private final boolean strictMode;
    private final CollectedMediatorMetadata collected = new CollectedMediatorMetadata();
    private final List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();
    private final List<AbstractMediator> mediators = new ArrayList<AbstractMediator>();
    @Inject
    @Any
    Instance<ChannelRegistar> streamRegistars;
    @Inject
    MediatorFactory mediatorFactory;
    @Inject
    ChannelRegistry channelRegistry;
    @Inject
    BeanManager beanManager;
    private boolean initialized;

    public MediatorManager() {
        this.strictMode = Boolean.parseBoolean(System.getProperty(STRICT_MODE_PROPERTY, "false"));
        if (this.strictMode) {
            LOGGER.debug("Strict mode enabled");
        }
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    public <T> void analyze(AnnotatedType<T> annotatedType, Bean<T> bean) {
        LOGGER.info("Scanning Type: {}", (Object)annotatedType.getJavaClass());
        Set methods = annotatedType.getMethods();
        methods.stream().filter(method -> method.isAnnotationPresent(Incoming.class) || method.isAnnotationPresent(Outgoing.class)).forEach(method -> this.collected.add(method.getJavaMember(), bean));
    }

    public <T> void analyze(Class<?> beanClass, Bean<T> bean) {
        for (Class<?> current = beanClass; current != Object.class; current = current.getSuperclass()) {
            Arrays.stream(current.getDeclaredMethods()).filter(m -> m.isAnnotationPresent(Incoming.class) || m.isAnnotationPresent(Outgoing.class)).forEach(m -> this.collected.add((Method)m, bean));
        }
    }

    @PreDestroy
    void shutdown() {
        LOGGER.info("Cancel subscriptions");
        this.subscriptions.forEach(Subscription::cancel);
        this.subscriptions.clear();
    }

    public void initializeAndRun() {
        if (this.initialized) {
            throw new IllegalStateException("MediatorManager was already initialized!");
        }
        LOGGER.info("Deployment done... start processing");
        this.streamRegistars.stream().forEach(ChannelRegistar::initialize);
        Set<String> unmanagedSubscribers = this.channelRegistry.getOutgoingNames();
        LOGGER.info("Initializing mediators");
        this.collected.mediators().forEach(configuration -> {
            AbstractMediator mediator = this.createMediator((MediatorConfiguration)configuration);
            LOGGER.debug("Initializing {}", (Object)mediator.getMethodAsString());
            if (configuration.getInvokerClass() != null) {
                try {
                    mediator.setInvoker(configuration.getInvokerClass().newInstance());
                }
                catch (IllegalAccessException | InstantiationException e) {
                    LOGGER.error("Unable to create invoker instance of " + configuration.getInvokerClass(), (Throwable)e);
                    return;
                }
            }
            try {
                Object beanInstance = this.beanManager.getReference(configuration.getBean(), Object.class, this.beanManager.createCreationalContext(configuration.getBean()));
                mediator.initialize(beanInstance);
            }
            catch (Throwable e) {
                LOGGER.error("Unable to initialize mediator: " + mediator.getMethodAsString(), e);
                return;
            }
            if (mediator.getConfiguration().shape() == Shape.PUBLISHER) {
                LOGGER.debug("Registering {} as publisher {}", (Object)mediator.getConfiguration().methodAsString(), (Object)mediator.getConfiguration().getOutgoing());
                this.channelRegistry.register(mediator.getConfiguration().getOutgoing(), mediator.getStream());
            }
            if (mediator.getConfiguration().shape() == Shape.SUBSCRIBER) {
                LOGGER.debug("Registering {} as subscriber {}", (Object)mediator.getConfiguration().methodAsString(), (Object)mediator.getConfiguration().getIncoming());
                this.channelRegistry.register(mediator.getConfiguration().getIncoming(), mediator.getComputedSubscriber());
            }
        });
        try {
            this.weaving(unmanagedSubscribers);
        }
        catch (WeavingException e) {
            throw new DeploymentException((Throwable)e);
        }
    }

    private void weaving(Set<String> unmanagedSubscribers) {
        LOGGER.info("Connecting mediators");
        List<AbstractMediator> unsatisfied = this.getAllNonSatisfiedMediators();
        ArrayList lazy = new ArrayList();
        while (!unsatisfied.isEmpty()) {
            int numberOfUnsatisfiedBeforeLoop = unsatisfied.size();
            unsatisfied.forEach(mediator -> {
                LOGGER.info("Attempt to resolve {}", (Object)mediator.getMethodAsString());
                List<PublisherBuilder<? extends Message>> sources = this.channelRegistry.getPublishers(mediator.configuration().getIncoming());
                Optional<PublisherBuilder<? extends Message>> maybeSource = this.getAggregatedSource(sources, (AbstractMediator)mediator, lazy);
                maybeSource.ifPresent(publisher -> {
                    mediator.connectToUpstream((PublisherBuilder<? extends Message>)publisher);
                    LOGGER.info("Connecting {} to `{}` ({})", new Object[]{mediator.getMethodAsString(), mediator.configuration().getIncoming(), publisher});
                    if (mediator.configuration().getOutgoing() != null) {
                        this.channelRegistry.register(mediator.getConfiguration().getOutgoing(), mediator.getStream());
                    }
                });
            });
            unsatisfied = this.getAllNonSatisfiedMediators();
            int numberOfUnsatisfiedAfterLoop = unsatisfied.size();
            if (numberOfUnsatisfiedAfterLoop != numberOfUnsatisfiedBeforeLoop) continue;
            if (this.strictMode) {
                throw new WeavingException("Impossible to bind mediators, some mediators are not connected: " + unsatisfied.stream().map(m -> m.configuration().methodAsString()).collect(Collectors.toList()) + ", available publishers:" + this.channelRegistry.getIncomingNames() + ", available emitters: " + this.channelRegistry.getEmitterNames());
            }
            LOGGER.warn("Impossible to bind mediators, some mediators are not connected: {}", unsatisfied.stream().map(m -> m.configuration().methodAsString()).collect(Collectors.toList()));
            LOGGER.warn("Available publishers: {}", this.channelRegistry.getIncomingNames());
            LOGGER.warn("Available emitters: {}", this.channelRegistry.getEmitterNames());
            break;
        }
        lazy.forEach(l -> l.configure(this.channelRegistry, LOGGER));
        this.mediators.stream().filter(m -> m.configuration().shape() == Shape.SUBSCRIBER).filter(AbstractMediator::isConnected).forEach(AbstractMediator::run);
        for (String name : unmanagedSubscribers) {
            List<AbstractMediator> list = this.lookupForMediatorsWithMatchingDownstream(name);
            EmitterImpl emitter = (EmitterImpl)this.channelRegistry.getEmitter(name);
            List<SubscriberBuilder<? extends Message, Void>> subscribers = this.channelRegistry.getSubscribers(name);
            for (AbstractMediator mediator2 : list) {
                if (subscribers.size() == 1) {
                    LOGGER.info("Connecting method {} to sink {}", (Object)mediator2.getMethodAsString(), (Object)name);
                    mediator2.getStream().to(subscribers.get(0)).run();
                    continue;
                }
                if (subscribers.size() <= 2) continue;
                LOGGER.warn("{} subscribers consuming the stream {}", (Object)subscribers.size(), (Object)name);
                subscribers.forEach(s -> {
                    LOGGER.info("Connecting method {} to sink {}", (Object)mediator2.getMethodAsString(), (Object)name);
                    mediator2.getStream().to(s).run();
                });
            }
            if (!list.isEmpty() || emitter == null) continue;
            if (subscribers.size() == 1) {
                LOGGER.info("Connecting emitter to sink {}", (Object)name);
                ReactiveStreams.fromPublisher(emitter.getPublisher()).to(subscribers.get(0)).run();
                continue;
            }
            if (subscribers.size() <= 2) continue;
            LOGGER.warn("{} subscribers consuming the stream {}", (Object)subscribers.size(), (Object)name);
            subscribers.forEach(s -> {
                LOGGER.info("Connecting emitter to sink {}", (Object)name);
                ReactiveStreams.fromPublisher(emitter.getPublisher()).to(s).run();
            });
        }
        this.initialized = true;
    }

    private List<AbstractMediator> lookupForMediatorsWithMatchingDownstream(String name) {
        return this.mediators.stream().filter(m -> m.configuration().getOutgoing() != null).filter(m -> m.configuration().getOutgoing().equalsIgnoreCase(name)).collect(Collectors.toList());
    }

    private List<AbstractMediator> getAllNonSatisfiedMediators() {
        return this.mediators.stream().filter(mediator -> !mediator.isConnected()).collect(Collectors.toList());
    }

    private AbstractMediator createMediator(MediatorConfiguration configuration) {
        AbstractMediator mediator = this.mediatorFactory.create(configuration);
        LOGGER.debug("Mediator created for {}", (Object)configuration.methodAsString());
        this.mediators.add(mediator);
        return mediator;
    }

    private Optional<PublisherBuilder<? extends Message>> getAggregatedSource(List<PublisherBuilder<? extends Message>> sources, AbstractMediator mediator, List<LazySource> lazy) {
        if (sources.isEmpty()) {
            return Optional.empty();
        }
        Merge.Mode merge = mediator.getConfiguration().getMerge();
        if (merge != null) {
            LazySource lazySource = new LazySource(mediator.configuration().getIncoming(), merge);
            lazy.add(lazySource);
            return Optional.of(ReactiveStreams.fromPublisher((Publisher)lazySource));
        }
        if (sources.size() > 1) {
            throw new WeavingException(mediator.configuration().getIncoming(), mediator.getMethodAsString(), sources.size());
        }
        return Optional.of(sources.get(0));
    }

    public void initializeEmitters(List<String> emitters) {
        for (String e : emitters) {
            EmitterImpl emitter = new EmitterImpl(e);
            Publisher publisher = emitter.getPublisher();
            this.channelRegistry.register(e, (PublisherBuilder<? extends Message>)ReactiveStreams.fromPublisher(publisher));
            this.channelRegistry.register(e, emitter);
        }
    }
}

