/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.extension;

import io.reactivex.Flowable;
import io.smallrye.reactive.messaging.AbstractMediator;
import io.smallrye.reactive.messaging.ChannelRegistar;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.Invoker;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.MediatorFactory;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.WeavingException;
import io.smallrye.reactive.messaging.annotations.Incomings;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.extension.CollectedMediatorMetadata;
import io.smallrye.reactive.messaging.extension.EmitterImpl;
import io.smallrye.reactive.messaging.extension.LazySource;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.spi.Contextual;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.AnnotatedType;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.DeploymentException;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class MediatorManager {
    private static final int DEFAULT_BUFFER_SIZE = 128;
    private static final Logger LOGGER = LoggerFactory.getLogger(MediatorManager.class);
    public static final String STRICT_MODE_PROPERTY = "smallrye-messaging-strict-binding";
    private final boolean strictMode = Boolean.parseBoolean(System.getProperty("smallrye-messaging-strict-binding", "false"));
    private final CollectedMediatorMetadata collected = new CollectedMediatorMetadata(this.strictMode);
    private final List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();
    private final List<AbstractMediator> mediators = new ArrayList<AbstractMediator>();
    @Inject
    @ConfigProperty(name="mp.messaging.emitter.default-buffer-size", defaultValue="128")
    int defaultBufferSize;
    @Inject
    @ConfigProperty(name="smallrye.messaging.emitter.default-buffer-size", defaultValue="128")
    @Deprecated
    int defaultBufferSizeLegacy;
    @Inject
    @Any
    Instance<ChannelRegistar> streamRegistars;
    @Inject
    MediatorFactory mediatorFactory;
    @Inject
    ChannelRegistry channelRegistry;
    @Inject
    BeanManager beanManager;
    @Inject
    Instance<PublisherDecorator> decorators;
    private volatile boolean initialized;

    public MediatorManager() {
        if (this.strictMode) {
            LOGGER.debug("Strict mode enabled");
        }
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    public <T> void analyze(AnnotatedType<T> annotatedType, Bean<T> bean) {
        LOGGER.info("Scanning Type: {}", (Object)annotatedType.getJavaClass());
        Set methods = annotatedType.getMethods();
        methods.stream().filter(this::hasMediatorAnnotations).forEach(method -> this.collected.add(method.getJavaMember(), bean));
    }

    private <T> boolean hasMediatorAnnotations(AnnotatedMethod<? super T> method) {
        return method.isAnnotationPresent(Incomings.class) || method.isAnnotationPresent(Incoming.class) || method.isAnnotationPresent(Outgoing.class);
    }

    private boolean hasMediatorAnnotations(Method m) {
        return m.isAnnotationPresent(Incomings.class) || m.isAnnotationPresent(Incoming.class) || m.isAnnotationPresent(Outgoing.class);
    }

    public <T> void analyze(Class<?> beanClass, Bean<T> bean) {
        for (Class<?> current = beanClass; current != Object.class; current = current.getSuperclass()) {
            Arrays.stream(current.getDeclaredMethods()).filter(this::hasMediatorAnnotations).forEach(m -> this.collected.add((Method)m, bean));
        }
    }

    public void addAnalyzed(Collection<? extends MediatorConfiguration> mediators) {
        this.collected.addAll(mediators);
    }

    @PreDestroy
    void shutdown() {
        LOGGER.info("Cancel subscriptions");
        this.subscriptions.forEach(Subscription::cancel);
        this.subscriptions.clear();
    }

    public void initializeAndRun() {
        if (this.initialized) {
            throw new IllegalStateException("MediatorManager was already initialized!");
        }
        LOGGER.info("Deployment done... start processing");
        this.streamRegistars.stream().forEach(ChannelRegistar::initialize);
        Set unmanagedSubscribers = this.channelRegistry.getOutgoingNames();
        LOGGER.info("Initializing mediators");
        this.collected.mediators().forEach(configuration -> {
            AbstractMediator mediator = this.createMediator((MediatorConfiguration)configuration);
            LOGGER.debug("Initializing {}", (Object)mediator.getMethodAsString());
            mediator.setDecorators(this.decorators);
            try {
                Object beanInstance = this.beanManager.getReference(configuration.getBean(), Object.class, this.beanManager.createCreationalContext((Contextual)configuration.getBean()));
                if (configuration.getInvokerClass() != null) {
                    try {
                        Constructor constructorUsingBeanInstance = configuration.getInvokerClass().getConstructor(Object.class);
                        if (constructorUsingBeanInstance != null) {
                            mediator.setInvoker((Invoker)constructorUsingBeanInstance.newInstance(beanInstance));
                        } else {
                            mediator.setInvoker((Invoker)configuration.getInvokerClass().newInstance());
                        }
                    }
                    catch (IllegalAccessException | InstantiationException e) {
                        LOGGER.error("Unable to create invoker instance of " + configuration.getInvokerClass(), (Throwable)e);
                        return;
                    }
                }
                mediator.initialize(beanInstance);
            }
            catch (Throwable e) {
                LOGGER.error("Unable to initialize mediator: " + mediator.getMethodAsString(), e);
                return;
            }
            if (mediator.getConfiguration().shape() == Shape.PUBLISHER) {
                LOGGER.debug("Registering {} as publisher {}", (Object)mediator.getConfiguration().methodAsString(), (Object)mediator.getConfiguration().getOutgoing());
                this.channelRegistry.register(mediator.getConfiguration().getOutgoing(), mediator.getStream());
            }
            if (mediator.getConfiguration().shape() == Shape.SUBSCRIBER) {
                List list = mediator.getConfiguration().getIncoming();
                LOGGER.debug("Registering {} as subscriber {}", (Object)mediator.getConfiguration().methodAsString(), (Object)list);
                for (String l : list) {
                    this.channelRegistry.register(l, mediator.getComputedSubscriber());
                }
            }
        });
        try {
            this.weaving(unmanagedSubscribers);
        }
        catch (WeavingException e) {
            throw new DeploymentException((Throwable)e);
        }
    }

    private void weaving(Set<String> unmanagedSubscribers) {
        LOGGER.info("Connecting mediators");
        List<AbstractMediator> unsatisfied = this.getAllNonSatisfiedMediators();
        ArrayList lazy = new ArrayList();
        while (!unsatisfied.isEmpty()) {
            int numberOfUnsatisfiedBeforeLoop = unsatisfied.size();
            unsatisfied.forEach(mediator -> {
                LOGGER.info("Attempt to resolve {}", (Object)mediator.getMethodAsString());
                List list = mediator.configuration().getIncoming();
                if (list.size() == 1) {
                    List sources = this.channelRegistry.getPublishers((String)list.get(0));
                    Optional<PublisherBuilder<? extends Message>> maybeSource = this.getAggregatedSource(sources, (String)list.get(0), (AbstractMediator)mediator, lazy);
                    maybeSource.ifPresent(publisher -> {
                        mediator.connectToUpstream((PublisherBuilder<? extends Message>)publisher);
                        LOGGER.info("Connecting {} to `{}` ({})", new Object[]{mediator.getMethodAsString(), list, publisher});
                        if (mediator.configuration().getOutgoing() != null) {
                            this.channelRegistry.register(mediator.getConfiguration().getOutgoing(), mediator.getStream());
                        }
                    });
                } else {
                    ArrayList upstreams = new ArrayList();
                    for (String sn : list) {
                        List sources = this.channelRegistry.getPublishers(sn);
                        Optional<PublisherBuilder<? extends Message>> maybeSource = this.getAggregatedSource(sources, sn, (AbstractMediator)mediator, lazy);
                        maybeSource.ifPresent(upstreams::add);
                    }
                    if (upstreams.size() == list.size()) {
                        Flowable merged = Flowable.merge((Iterable)upstreams.stream().map(PublisherBuilder::buildRs).collect(Collectors.toList()));
                        mediator.connectToUpstream((PublisherBuilder<? extends Message>)ReactiveStreams.fromPublisher((Publisher)merged));
                        LOGGER.info("Connecting {} to `{}`", (Object)mediator.getMethodAsString(), (Object)list);
                        if (mediator.configuration().getOutgoing() != null) {
                            this.channelRegistry.register(mediator.getConfiguration().getOutgoing(), mediator.getStream());
                        }
                    }
                }
            });
            unsatisfied = this.getAllNonSatisfiedMediators();
            int numberOfUnsatisfiedAfterLoop = unsatisfied.size();
            if (numberOfUnsatisfiedAfterLoop != numberOfUnsatisfiedBeforeLoop) continue;
            if (this.strictMode) {
                throw new WeavingException("Impossible to bind mediators, some mediators are not connected: " + unsatisfied.stream().map(m -> m.configuration().methodAsString()).collect(Collectors.toList()) + ", available publishers:" + this.channelRegistry.getIncomingNames() + ", available emitters: " + this.channelRegistry.getEmitterNames());
            }
            LOGGER.warn("Impossible to bind mediators, some mediators are not connected: {}", unsatisfied.stream().map(m -> m.configuration().methodAsString()).collect(Collectors.toList()));
            LOGGER.warn("Available publishers: {}", (Object)this.channelRegistry.getIncomingNames());
            LOGGER.warn("Available emitters: {}", (Object)this.channelRegistry.getEmitterNames());
            break;
        }
        lazy.forEach(l -> l.configure(this.channelRegistry, LOGGER));
        this.mediators.stream().filter(m -> m.configuration().shape() == Shape.SUBSCRIBER).filter(AbstractMediator::isConnected).forEach(AbstractMediator::run);
        for (String name : unmanagedSubscribers) {
            List<AbstractMediator> list = this.lookupForMediatorsWithMatchingDownstream(name);
            EmitterImpl emitter = (EmitterImpl)this.channelRegistry.getEmitter(name);
            List subscribers = this.channelRegistry.getSubscribers(name);
            for (AbstractMediator mediator2 : list) {
                if (subscribers.size() == 1) {
                    LOGGER.info("Connecting method {} to sink {}", (Object)mediator2.getMethodAsString(), (Object)name);
                    mediator2.getStream().to((SubscriberBuilder)subscribers.get(0)).run();
                    continue;
                }
                if (subscribers.size() <= 2) continue;
                LOGGER.warn("{} subscribers consuming the stream {}", (Object)subscribers.size(), (Object)name);
                subscribers.forEach(s -> {
                    LOGGER.info("Connecting method {} to sink {}", (Object)mediator2.getMethodAsString(), (Object)name);
                    mediator2.getStream().to(s).run();
                });
            }
            if (!list.isEmpty() || emitter == null) continue;
            if (subscribers.size() == 1) {
                LOGGER.info("Connecting emitter to sink {}", (Object)name);
                ReactiveStreams.fromPublisher(emitter.getPublisher()).to((SubscriberBuilder)subscribers.get(0)).run();
                continue;
            }
            if (subscribers.size() <= 2) continue;
            LOGGER.warn("{} subscribers consuming the stream {}", (Object)subscribers.size(), (Object)name);
            subscribers.forEach(s -> {
                LOGGER.info("Connecting emitter to sink {}", (Object)name);
                ReactiveStreams.fromPublisher(emitter.getPublisher()).to(s).run();
            });
        }
        this.initialized = true;
    }

    private List<AbstractMediator> lookupForMediatorsWithMatchingDownstream(String name) {
        return this.mediators.stream().filter(m -> m.configuration().getOutgoing() != null).filter(m -> m.configuration().getOutgoing().equalsIgnoreCase(name)).collect(Collectors.toList());
    }

    private List<AbstractMediator> getAllNonSatisfiedMediators() {
        return this.mediators.stream().filter(mediator -> !mediator.isConnected()).collect(Collectors.toList());
    }

    private AbstractMediator createMediator(MediatorConfiguration configuration) {
        AbstractMediator mediator = this.mediatorFactory.create(configuration);
        LOGGER.debug("Mediator created for {}", (Object)configuration.methodAsString());
        this.mediators.add(mediator);
        return mediator;
    }

    private Optional<PublisherBuilder<? extends Message>> getAggregatedSource(List<PublisherBuilder<? extends Message>> sources, String sourceName, AbstractMediator mediator, List<LazySource> lazy) {
        if (sources.isEmpty()) {
            return Optional.empty();
        }
        Merge.Mode merge = mediator.getConfiguration().getMerge();
        if (merge != null) {
            LazySource lazySource = new LazySource(sourceName, merge);
            lazy.add(lazySource);
            return Optional.of(ReactiveStreams.fromPublisher((Publisher)lazySource));
        }
        if (sources.size() > 1) {
            throw new WeavingException(sourceName, mediator.getMethodAsString(), sources.size());
        }
        return Optional.of(sources.get(0));
    }

    public void initializeEmitters(Map<String, OnOverflow> emitters) {
        for (Map.Entry<String, OnOverflow> e : emitters.entrySet()) {
            int bufferSize = this.getDefaultBufferSize();
            if (e.getValue() != null) {
                this.initializeEmitter(e.getKey(), e.getValue().value().name(), e.getValue().bufferSize(), bufferSize);
                continue;
            }
            this.initializeEmitter(e.getKey(), null, bufferSize, bufferSize);
        }
    }

    private int getDefaultBufferSize() {
        if (this.defaultBufferSize == 128 && this.defaultBufferSizeLegacy != 128) {
            return this.defaultBufferSizeLegacy;
        }
        return this.defaultBufferSize;
    }

    public void initializeEmitter(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize) {
        EmitterImpl emitter = new EmitterImpl(name, overFlowStrategy, bufferSize, defaultBufferSize);
        Publisher publisher = emitter.getPublisher();
        this.channelRegistry.register(name, ReactiveStreams.fromPublisher(publisher));
        this.channelRegistry.register(name, emitter);
    }
}

