/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.providers.wiring;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.EmitterFactory;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.MessagePublisherProvider;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.annotations.EmitterFactoryFor;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.providers.AbstractMediator;
import io.smallrye.reactive.messaging.providers.extension.ChannelConfiguration;
import io.smallrye.reactive.messaging.providers.extension.MediatorManager;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import io.smallrye.reactive.messaging.providers.wiring.Graph;
import io.smallrye.reactive.messaging.providers.wiring.TooManyDownstreamCandidatesException;
import io.smallrye.reactive.messaging.providers.wiring.TooManyUpstreamCandidatesException;
import io.smallrye.reactive.messaging.providers.wiring.UnsatisfiedBroadcastException;
import io.smallrye.reactive.messaging.providers.wiring.WiringException;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class Wiring {
    public static final int DEFAULT_BUFFER_SIZE = 128;
    @Inject
    @ConfigProperty(name="mp.messaging.emitter.default-buffer-size", defaultValue="128")
    int defaultBufferSize;
    @Inject
    @ConfigProperty(name="smallrye.messaging.emitter.default-buffer-size", defaultValue="128")
    @Deprecated
    int defaultBufferSizeLegacy;
    @Inject
    MediatorManager manager;
    @Any
    @Inject
    Instance<EmitterFactory<?>> emitterFactories;
    @Any
    @Inject
    Instance<SubscriberDecorator> subscriberDecorators;
    @Any
    @Inject
    Instance<PublisherDecorator> publisherDecorators;
    private final List<Component> components = new ArrayList<Component>();
    private Graph graph;
    private boolean strictMode;

    @PreDestroy
    public void terminateAllComponents() {
        this.components.forEach(Component::terminate);
    }

    public void prepare(boolean strictMode, ChannelRegistry registry, List<EmitterConfiguration> emitters, List<ChannelConfiguration> channels, List<MediatorConfiguration> mediators) {
        this.strictMode = strictMode;
        for (MediatorConfiguration mediatorConfiguration : mediators) {
            this.components.add(this.createMediatorComponent(mediatorConfiguration));
        }
        for (ChannelConfiguration channelConfiguration : channels) {
            this.components.add(new InjectedChannelComponent(channelConfiguration, strictMode));
        }
        for (EmitterConfiguration emitterConfiguration : emitters) {
            this.components.add(new EmitterComponent(emitterConfiguration, this.publisherDecorators, this.emitterFactories, this.defaultBufferSize, this.defaultBufferSizeLegacy));
        }
        for (Map.Entry entry : registry.getIncomingChannels().entrySet()) {
            this.components.add(this.getChannelConcurrency((String)entry.getKey()).map(c -> new InboundConnectorComponent((String)entry.getKey(), (Boolean)entry.getValue(), (int)c)).orElseGet(() -> new InboundConnectorComponent((String)entry.getKey(), (Boolean)entry.getValue())));
        }
        for (Map.Entry entry : registry.getOutgoingChannels().entrySet()) {
            this.components.add(new OutgoingConnectorComponent((String)entry.getKey(), this.subscriberDecorators, (Boolean)entry.getValue()));
        }
    }

    Map<String, Integer> getIncomingConcurrency(MediatorConfiguration mediator) {
        if (this.manager == null) {
            return Collections.emptyMap();
        }
        return this.manager.getIncomingConcurrency(mediator);
    }

    Optional<Integer> getChannelConcurrency(String channel) {
        if (this.manager == null) {
            return Optional.empty();
        }
        return this.manager.getChannelConcurrency(channel);
    }

    private MediatorComponent createMediatorComponent(MediatorConfiguration mediator) {
        Map<String, Integer> incomingConcurrency = this.getIncomingConcurrency(mediator);
        if (!mediator.getOutgoings().isEmpty() && !mediator.getIncoming().isEmpty()) {
            ProcessorMediatorComponent component = new ProcessorMediatorComponent(this.manager, mediator);
            return incomingConcurrency.isEmpty() ? component : new ProcessorConcurrentComponent(this.manager, mediator, component, incomingConcurrency);
        }
        if (!mediator.getOutgoings().isEmpty()) {
            return new PublisherMediatorComponent(this.manager, mediator);
        }
        SubscriberMediatorComponent component = new SubscriberMediatorComponent(this.manager, mediator);
        return incomingConcurrency.isEmpty() ? component : new SubscriberConcurrentComponent(this.manager, mediator, component, incomingConcurrency);
    }

    public Graph resolve() {
        ProviderLogging.log.startGraphResolution(this.components.size());
        long begin = System.nanoTime();
        LinkedHashSet<Component> resolved = new LinkedHashSet<Component>();
        LinkedHashSet<ConsumingComponent> unresolved = new LinkedHashSet<ConsumingComponent>();
        for (Component component : this.components) {
            if (component.isUpstreamResolved()) {
                resolved.add(component);
                continue;
            }
            unresolved.add((ConsumingComponent)component);
        }
        boolean doneOrStale = false;
        while (!doneOrStale) {
            ArrayList<ConsumingComponent> resolvedDuringThisTurn = new ArrayList<ConsumingComponent>();
            for (ConsumingComponent consumingComponent : unresolved) {
                List<String> incomings = consumingComponent.incomings();
                for (String string : incomings) {
                    List<Component> matches = this.getMatchesFor(string, resolved);
                    if (matches.isEmpty()) continue;
                    matches.forEach(m -> this.bind(consumingComponent, (Component)m));
                    if (!consumingComponent.isUpstreamResolved()) continue;
                    resolvedDuringThisTurn.add(consumingComponent);
                }
            }
            resolved.addAll(resolvedDuringThisTurn);
            resolvedDuringThisTurn.forEach(unresolved::remove);
            doneOrStale = resolvedDuringThisTurn.isEmpty() || unresolved.isEmpty();
            for (Component component : resolved) {
                if (!(component instanceof ConsumingComponent)) continue;
                ConsumingComponent cc = (ConsumingComponent)component;
                List<String> incomings = cc.incomings();
                for (String incoming2 : incomings) {
                    List<Component> matches = this.getMatchesFor(incoming2, resolved);
                    for (Component match : matches) {
                        this.bind(cc, match);
                    }
                }
            }
        }
        ArrayList<ConsumingComponent> newlyResolved = new ArrayList<ConsumingComponent>();
        for (ConsumingComponent consumingComponent : unresolved) {
            for (String incoming : consumingComponent.incomings()) {
                List<Component> list = this.getMatchesFor(incoming, unresolved);
                if (list.isEmpty()) continue;
                newlyResolved.add(consumingComponent);
                list.forEach(m -> this.bind(c, (Component)m));
            }
        }
        if (!newlyResolved.isEmpty()) {
            newlyResolved.forEach(unresolved::remove);
            resolved.addAll(newlyResolved);
        }
        this.graph = new Graph(this.strictMode, resolved, unresolved);
        long duration = System.nanoTime() - begin;
        ProviderLogging.log.completedGraphResolution(duration);
        return this.graph;
    }

    public Graph getGraph() {
        return this.graph;
    }

    private void bind(ConsumingComponent consumer, Component provider) {
        consumer.connectUpstream(provider);
        provider.connectDownstream(consumer);
    }

    private List<Component> getMatchesFor(String incoming, Set<? extends Component> candidates) {
        ArrayList<Component> matches = new ArrayList<Component>();
        for (Component component : candidates) {
            if (!component.outgoings().stream().anyMatch(s -> s.equalsIgnoreCase(incoming))) continue;
            matches.add(component);
        }
        return matches;
    }

    static abstract class MediatorComponent
    implements Component {
        final MediatorConfiguration configuration;
        final MediatorManager manager;

        protected MediatorComponent(MediatorManager manager, MediatorConfiguration configuration) {
            this.configuration = configuration;
            this.manager = manager;
        }
    }

    static class InjectedChannelComponent
    implements ConsumingComponent,
    NoDownstreamComponent {
        private final String name;
        private final Set<Component> upstreams = new LinkedHashSet<Component>();
        private final boolean strict;

        public InjectedChannelComponent(ChannelConfiguration configuration, boolean strict) {
            this.name = configuration.channelName;
            this.strict = strict;
        }

        @Override
        public List<String> incomings() {
            return Collections.singletonList(this.name);
        }

        @Override
        public boolean merge() {
            return !this.strict;
        }

        @Override
        public Set<Component> upstreams() {
            return this.upstreams;
        }

        public String toString() {
            return "@Channel{channel:'" + this.name + "'}";
        }

        @Override
        public void materialize(ChannelRegistry registry) {
        }

        @Override
        public void validate() throws WiringException {
            if (this.strict && this.upstreams().size() > 1) {
                throw new TooManyUpstreamCandidatesException(this);
            }
        }
    }

    static class EmitterComponent
    implements PublishingComponent,
    NoUpstreamComponent {
        private final EmitterConfiguration configuration;
        private final Instance<PublisherDecorator> decorators;
        private final Instance<EmitterFactory<?>> emitterFactories;
        private final Set<Component> downstreams = new LinkedHashSet<Component>();
        private final int defaultBufferSize;
        private final int defaultBufferSizeLegacy;

        public EmitterComponent(EmitterConfiguration configuration, Instance<PublisherDecorator> decorators, Instance<EmitterFactory<?>> emitterFactories, int defaultBufferSize, int defaultBufferSizeLegacy) {
            this.configuration = configuration;
            this.decorators = decorators;
            this.emitterFactories = emitterFactories;
            this.defaultBufferSize = defaultBufferSize;
            this.defaultBufferSizeLegacy = defaultBufferSizeLegacy;
        }

        @Override
        public Optional<String> outgoing() {
            return Optional.of(this.configuration.name());
        }

        @Override
        public List<String> outgoings() {
            return Collections.singletonList(this.configuration.name());
        }

        @Override
        public Set<Component> downstreams() {
            return this.downstreams;
        }

        @Override
        public boolean broadcast() {
            return this.configuration.broadcast();
        }

        @Override
        public int getRequiredNumberOfSubscribers() {
            return this.configuration.numberOfSubscriberBeforeConnecting();
        }

        public String toString() {
            return "Emitter{channel:'" + this.getOutgoingChannel() + "'}";
        }

        @Override
        public void materialize(ChannelRegistry registry) {
            int def = this.getDefaultBufferSize();
            this.registerEmitter(registry, def);
        }

        private <T extends MessagePublisherProvider<?>> void registerEmitter(ChannelRegistry registry, int def) {
            EmitterFactory<?> emitterFactory = this.getEmitterFactory(this.configuration.emitterType());
            MessagePublisherProvider emitter = emitterFactory.createEmitter(this.configuration, (long)def);
            Class type = this.configuration.emitterType().value();
            registry.register(this.configuration.name(), type, (Object)emitter);
            Multi publisher = Multi.createFrom().publisher(emitter.getPublisher());
            for (PublisherDecorator decorator : CDIUtils.getSortedInstances(this.decorators)) {
                publisher = decorator.decorate(publisher, List.of(this.configuration.name()), false);
            }
            registry.register(this.configuration.name(), (Flow.Publisher)publisher, this.broadcast());
        }

        private EmitterFactory<?> getEmitterFactory(EmitterFactoryFor emitterType) {
            return (EmitterFactory)this.emitterFactories.select(new Annotation[]{emitterType}).get();
        }

        private int getDefaultBufferSize() {
            if (this.defaultBufferSize == 128 && this.defaultBufferSizeLegacy != 128) {
                return this.defaultBufferSizeLegacy;
            }
            return this.defaultBufferSize;
        }

        @Override
        public void validate() throws WiringException {
            if (!this.configuration.broadcast() && this.downstreams().size() > 1) {
                throw new TooManyDownstreamCandidatesException(this);
            }
            if (this.broadcast() && this.getRequiredNumberOfSubscribers() != 0 && this.getRequiredNumberOfSubscribers() != this.downstreams.size()) {
                throw new UnsatisfiedBroadcastException(this);
            }
        }
    }

    public static interface Component {
        public void validate() throws WiringException;

        public boolean isUpstreamResolved();

        public boolean isDownstreamResolved();

        default public Optional<String> outgoing() {
            return Optional.empty();
        }

        default public List<String> outgoings() {
            return Collections.emptyList();
        }

        default public List<String> incomings() {
            return Collections.emptyList();
        }

        default public Set<Component> downstreams() {
            return Collections.emptySet();
        }

        default public Set<Component> upstreams() {
            return Collections.emptySet();
        }

        default public void connectDownstream(Component downstream) {
            throw new UnsupportedOperationException("Downstream connection not expected for " + this);
        }

        public void materialize(ChannelRegistry var1);

        default public void terminate() {
        }
    }

    static class OutgoingConnectorComponent
    implements ConsumingComponent,
    NoDownstreamComponent {
        private final String name;
        private final Set<Component> upstreams = new LinkedHashSet<Component>();
        private final Instance<SubscriberDecorator> subscriberDecorators;
        private final boolean merge;

        public OutgoingConnectorComponent(String name, Instance<SubscriberDecorator> subscriberDecorators, boolean merge) {
            this.name = name;
            this.subscriberDecorators = subscriberDecorators;
            this.merge = merge;
        }

        @Override
        public List<String> incomings() {
            return Collections.singletonList(this.name);
        }

        @Override
        public boolean merge() {
            return this.merge;
        }

        @Override
        public void connectUpstream(Component upstream) {
            this.upstreams.add(upstream);
        }

        @Override
        public Set<Component> upstreams() {
            return this.upstreams;
        }

        public String toString() {
            return "OutgoingConnector{channel:'" + this.name + "', attribute:'mp.messaging.outgoing." + this.name + "'}";
        }

        @Override
        public void materialize(ChannelRegistry registry) {
            List publishers = registry.getPublishers(this.name);
            Multi merged = publishers.size() == 1 ? MultiUtils.publisher((Flow.Publisher)publishers.get(0)) : Multi.createBy().merging().streams((Iterable)publishers.stream().map(p -> p).collect(Collectors.toList()));
            Flow.Subscriber connector = (Flow.Subscriber)registry.getSubscribers(this.name).get(0);
            for (SubscriberDecorator decorator : CDIUtils.getSortedInstances(this.subscriberDecorators)) {
                merged = decorator.decorate(merged, Collections.singletonList(this.name), true);
            }
            merged.subscribe().withSubscriber(connector);
        }

        @Override
        public void validate() throws WiringException {
            if (this.upstreams().size() > 1 && !this.merge) {
                throw new TooManyUpstreamCandidatesException(this);
            }
        }
    }

    static class ProcessorMediatorComponent
    extends MediatorComponent
    implements ConsumingComponent,
    PublishingComponent {
        private final Set<Component> upstreams = new LinkedHashSet<Component>();
        private final Set<Component> downstreams = new LinkedHashSet<Component>();
        private AbstractMediator mediator;

        protected ProcessorMediatorComponent(MediatorManager manager, MediatorConfiguration configuration) {
            super(manager, configuration);
        }

        @Override
        public Set<Component> upstreams() {
            return this.upstreams;
        }

        @Override
        public List<String> incomings() {
            return this.configuration.getIncoming();
        }

        @Override
        public boolean merge() {
            return this.configuration.getMerge() != null;
        }

        @Override
        public Optional<String> outgoing() {
            return Optional.of(this.configuration.getOutgoing());
        }

        @Override
        public List<String> outgoings() {
            return this.configuration.getOutgoings();
        }

        @Override
        public Set<Component> downstreams() {
            return this.downstreams;
        }

        @Override
        public boolean broadcast() {
            return this.configuration.getBroadcast() || this.outgoings().size() > 1;
        }

        @Override
        public int getRequiredNumberOfSubscribers() {
            if (this.outgoings().size() > 1) {
                return Math.max(this.configuration.getNumberOfSubscriberBeforeConnecting(), this.downstreams().size());
            }
            return this.configuration.getNumberOfSubscriberBeforeConnecting();
        }

        private boolean hasAllUpstreams() {
            for (String incoming : this.incomings()) {
                if (!this.upstreams().stream().noneMatch(c -> c.outgoings().contains(incoming))) continue;
                return false;
            }
            return true;
        }

        @Override
        public boolean isUpstreamResolved() {
            return this.hasAllUpstreams();
        }

        public String toString() {
            return "ProcessingMethod{method:'" + this.configuration.methodAsString() + "', incoming:'" + String.join((CharSequence)",", this.configuration.getIncoming()) + "', outgoing:'" + String.join((CharSequence)",", this.configuration.getOutgoings()) + "'}";
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void materialize(ChannelRegistry registry) {
            ProcessorMediatorComponent processorMediatorComponent = this;
            synchronized (processorMediatorComponent) {
                this.mediator = this.manager.createMediator(this.configuration);
            }
            boolean concat = this.configuration.getMerge() == Merge.Mode.CONCAT;
            boolean one = this.configuration.getMerge() == Merge.Mode.ONE;
            ArrayList publishers = new ArrayList();
            for (String channel : this.configuration.getIncoming()) {
                publishers.addAll(registry.getPublishers(channel));
            }
            Object aggregates = publishers.size() == 1 ? MultiUtils.publisher((Flow.Publisher)publishers.get(0)) : (concat ? Multi.createBy().concatenating().streams((Iterable)publishers.stream().map(p -> p).collect(Collectors.toList())) : (one ? MultiUtils.publisher((Flow.Publisher)publishers.get(0)) : Multi.createBy().merging().streams((Iterable)publishers.stream().map(p -> p).collect(Collectors.toList()))));
            this.mediator.connectToUpstream((Multi<? extends Message<?>>)aggregates);
            if (this.outgoings().size() > 1) {
                for (String outgoing : this.configuration.getOutgoings()) {
                    registry.register(outgoing, this.mediator.getStream(outgoing), this.broadcast());
                }
            } else {
                registry.register(this.getOutgoingChannel(), this.mediator.getStream(), this.broadcast());
            }
        }

        @Override
        public void validate() throws WiringException {
            for (String incoming : this.incomings()) {
                List<Component> components = this.downstreams().stream().filter(c -> c.outgoings().contains(incoming)).collect(Collectors.toList());
                if (components.size() <= 1 || this.merge()) continue;
                throw new TooManyUpstreamCandidatesException(this, incoming, components);
            }
            if (!this.merge() && this.upstreams.size() != this.incomings().size()) {
                throw new TooManyUpstreamCandidatesException(this);
            }
            if (!this.broadcast() && this.downstreams().size() > 1) {
                throw new TooManyDownstreamCandidatesException(this);
            }
            if (this.broadcast() && this.getRequiredNumberOfSubscribers() != 0 && this.getRequiredNumberOfSubscribers() != this.downstreams.size()) {
                throw new UnsatisfiedBroadcastException(this);
            }
        }

        @Override
        public synchronized void terminate() {
            if (this.mediator != null) {
                this.mediator.terminate();
            }
        }
    }

    static class ProcessorConcurrentComponent
    extends ConcurrentComponent<ProcessorMediatorComponent>
    implements PublishingComponent {
        protected ProcessorConcurrentComponent(MediatorManager manager, MediatorConfiguration configuration, ProcessorMediatorComponent mediator, Map<String, Integer> incomingConcurrency) {
            super(manager, configuration, mediator, incomingConcurrency);
        }

        @Override
        public void materialize(ChannelRegistry registry) {
            ArrayList<AbstractMediator> mediators = new ArrayList<AbstractMediator>();
            for (String incoming : this.configuration.getIncoming()) {
                for (Flow.Publisher publisher : registry.getPublishers(incoming)) {
                    AbstractMediator mediator = this.manager.createMediator(this.configuration);
                    mediator.connectToUpstream(MultiUtils.publisher(publisher));
                    mediators.add(mediator);
                }
            }
            if (((ProcessorMediatorComponent)this.delegate).outgoings().size() > 1) {
                for (String outgoing : this.configuration.getOutgoings()) {
                    List streams = mediators.stream().map(m -> m.getStream(outgoing).broadcast().toAllSubscribers()).collect(Collectors.toList());
                    Multi aggregates = Multi.createBy().merging().streams((Iterable)streams.stream().map(p -> p).collect(Collectors.toList()));
                    registry.register(outgoing, (Flow.Publisher)aggregates, ((ProcessorMediatorComponent)this.delegate).broadcast());
                }
            } else {
                List streams = mediators.stream().map(AbstractMediator::getStream).collect(Collectors.toList());
                Multi aggregates = Multi.createBy().merging().streams((Iterable)streams.stream().map(p -> p).collect(Collectors.toList()));
                registry.register(((ProcessorMediatorComponent)this.delegate).getOutgoingChannel(), (Flow.Publisher)aggregates, ((ProcessorMediatorComponent)this.delegate).broadcast());
            }
        }

        @Override
        public boolean broadcast() {
            return ((ProcessorMediatorComponent)this.delegate).broadcast();
        }

        @Override
        public int getRequiredNumberOfSubscribers() {
            return ((ProcessorMediatorComponent)this.delegate).getRequiredNumberOfSubscribers();
        }

        @Override
        public void connectDownstream(Component downstream) {
            ((ProcessorMediatorComponent)this.delegate).connectDownstream(downstream);
        }

        public String toString() {
            return "ProcessingMethod{method:'" + this.configuration.methodAsString() + "', incoming:'" + String.join((CharSequence)",", this.configuration.getIncoming()) + "', outgoing:'" + String.join((CharSequence)",", this.configuration.getOutgoings()) + "', concurrency:'" + this.concurrency + "'}";
        }
    }

    static class PublisherMediatorComponent
    extends MediatorComponent
    implements PublishingComponent,
    NoUpstreamComponent {
        private final Set<Component> downstreams = new LinkedHashSet<Component>();
        private AbstractMediator mediator;

        protected PublisherMediatorComponent(MediatorManager manager, MediatorConfiguration configuration) {
            super(manager, configuration);
        }

        @Override
        public List<String> outgoings() {
            return this.configuration.getOutgoings();
        }

        @Override
        public Optional<String> outgoing() {
            return Optional.of(this.configuration.getOutgoing());
        }

        @Override
        public Set<Component> downstreams() {
            return this.downstreams;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void materialize(ChannelRegistry registry) {
            PublisherMediatorComponent publisherMediatorComponent = this;
            synchronized (publisherMediatorComponent) {
                this.mediator = this.manager.createMediator(this.configuration);
            }
            if (this.outgoings().size() > 1) {
                for (String outgoing : this.configuration.getOutgoings()) {
                    registry.register(outgoing, this.mediator.getStream(outgoing), this.broadcast());
                }
            } else {
                registry.register(this.getOutgoingChannel(), this.mediator.getStream(), this.broadcast());
            }
        }

        @Override
        public boolean broadcast() {
            return this.configuration.getBroadcast() || this.outgoings().size() > 1;
        }

        @Override
        public int getRequiredNumberOfSubscribers() {
            if (this.outgoings().size() > 1) {
                return Math.max(this.configuration.getNumberOfSubscriberBeforeConnecting(), this.downstreams().size());
            }
            return this.configuration.getNumberOfSubscriberBeforeConnecting();
        }

        public String toString() {
            return "PublisherMethod{method:'" + this.configuration.methodAsString() + "', outgoing:'" + this.getOutgoingChannel() + "'}";
        }

        @Override
        public void validate() throws WiringException {
            if (!this.broadcast() && this.downstreams().size() > 1) {
                throw new TooManyDownstreamCandidatesException(this);
            }
            if (this.broadcast() && this.getRequiredNumberOfSubscribers() != 0 && this.getRequiredNumberOfSubscribers() != this.downstreams.size()) {
                throw new UnsatisfiedBroadcastException(this);
            }
        }

        @Override
        public synchronized void terminate() {
            if (this.mediator != null) {
                this.mediator.terminate();
            }
        }
    }

    static class SubscriberMediatorComponent
    extends MediatorComponent
    implements ConsumingComponent,
    NoDownstreamComponent {
        private final Set<Component> upstreams = new LinkedHashSet<Component>();
        private AbstractMediator mediator;

        protected SubscriberMediatorComponent(MediatorManager manager, MediatorConfiguration configuration) {
            super(manager, configuration);
        }

        @Override
        public Set<Component> upstreams() {
            return this.upstreams;
        }

        @Override
        public List<String> incomings() {
            return this.configuration.getIncoming();
        }

        @Override
        public boolean merge() {
            return this.configuration.getMerge() != null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void materialize(ChannelRegistry registry) {
            SubscriberMediatorComponent subscriberMediatorComponent = this;
            synchronized (subscriberMediatorComponent) {
                this.mediator = this.manager.createMediator(this.configuration);
            }
            boolean concat = this.configuration.getMerge() == Merge.Mode.CONCAT;
            boolean one = this.configuration.getMerge() == Merge.Mode.ONE;
            ArrayList publishers = new ArrayList();
            for (String channel : this.configuration.getIncoming()) {
                publishers.addAll(registry.getPublishers(channel));
            }
            Object aggregates = publishers.size() == 1 ? MultiUtils.publisher((Flow.Publisher)publishers.get(0)) : (concat ? Multi.createBy().concatenating().streams((Iterable)publishers.stream().map(p -> p).collect(Collectors.toList())) : (one ? MultiUtils.publisher((Flow.Publisher)publishers.get(0)) : Multi.createBy().merging().streams((Iterable)publishers.stream().map(p -> p).collect(Collectors.toList()))));
            this.mediator.connectToUpstream((Multi<? extends Message<?>>)aggregates);
            Flow.Subscriber<Message<?>> subscriber = this.mediator.getComputedSubscriber();
            this.incomings().forEach(s -> registry.register(s, subscriber, this.merge()));
            this.mediator.run();
        }

        public String toString() {
            return "SubscriberMethod{method:'" + this.configuration.methodAsString() + "', incoming:'" + String.join((CharSequence)",", this.configuration.getIncoming()) + "'}";
        }

        private boolean hasAllUpstreams() {
            for (String incoming : this.incomings()) {
                if (!this.upstreams().stream().noneMatch(c -> c.outgoings().contains(incoming))) continue;
                return false;
            }
            return true;
        }

        @Override
        public boolean isUpstreamResolved() {
            return this.hasAllUpstreams();
        }

        @Override
        public void validate() throws WiringException {
            for (String incoming : this.incomings()) {
                List<Component> components = this.downstreams().stream().filter(c -> c.outgoings().contains(incoming)).collect(Collectors.toList());
                if (components.size() <= 1 || this.merge()) continue;
                throw new TooManyUpstreamCandidatesException(this, incoming, components);
            }
            if (!this.merge() && this.upstreams.size() != this.incomings().size()) {
                throw new TooManyUpstreamCandidatesException(this);
            }
        }

        @Override
        public synchronized void terminate() {
            if (this.mediator != null) {
                this.mediator.terminate();
            }
        }
    }

    static class SubscriberConcurrentComponent
    extends ConcurrentComponent<SubscriberMediatorComponent> {
        protected SubscriberConcurrentComponent(MediatorManager manager, MediatorConfiguration configuration, SubscriberMediatorComponent mediator, Map<String, Integer> incomingConcurrency) {
            super(manager, configuration, mediator, incomingConcurrency);
        }

        @Override
        public void materialize(ChannelRegistry registry) {
            for (String incoming : this.configuration.getIncoming()) {
                List publishers = registry.getPublishers(incoming);
                for (Flow.Publisher publisher : publishers) {
                    AbstractMediator mediator = this.manager.createMediator(this.configuration);
                    mediator.connectToUpstream(MultiUtils.publisher(publisher));
                    Flow.Subscriber<Message<?>> subscriber = mediator.getComputedSubscriber();
                    registry.register(incoming, subscriber, this.merge());
                    mediator.run();
                }
            }
        }

        public String toString() {
            return "SubscriberMethod{method:'" + this.configuration.methodAsString() + "', incoming:'" + String.join((CharSequence)",", this.configuration.getIncoming()) + "', concurrency: '" + this.concurrency + "'}";
        }
    }

    public static interface ConsumingComponent
    extends Component {
        @Override
        default public boolean isUpstreamResolved() {
            return !this.upstreams().isEmpty();
        }

        default public void connectUpstream(Component upstream) {
            this.upstreams().add(upstream);
        }

        public boolean merge();
    }

    static class InboundConnectorComponent
    implements PublishingComponent,
    NoUpstreamComponent {
        private final String name;
        private final boolean broadcast;
        private final int concurrency;
        private final Set<Component> downstreams = new LinkedHashSet<Component>();

        public InboundConnectorComponent(String name, boolean broadcast) {
            this(name, broadcast, 0);
        }

        public InboundConnectorComponent(String name, boolean broadcast, int concurrency) {
            this.name = name;
            this.broadcast = broadcast;
            this.concurrency = concurrency;
        }

        @Override
        public Optional<String> outgoing() {
            return Optional.of(this.name);
        }

        @Override
        public List<String> outgoings() {
            return Collections.singletonList(this.name);
        }

        @Override
        public Set<Component> downstreams() {
            return this.downstreams;
        }

        @Override
        public void materialize(ChannelRegistry registry) {
        }

        @Override
        public boolean broadcast() {
            return this.broadcast;
        }

        @Override
        public int getRequiredNumberOfSubscribers() {
            return 0;
        }

        public String toString() {
            return "IncomingConnector{channel:'" + this.name + "', attribute:'mp.messaging.incoming." + this.name + "'" + (String)(this.concurrency > 0 ? " , concurrency:'" + this.concurrency + "'}" : "}");
        }

        @Override
        public void validate() throws WiringException {
            if (!this.broadcast() && this.downstreams().size() > 1) {
                throw new TooManyDownstreamCandidatesException(this);
            }
        }
    }

    static abstract class ConcurrentComponent<T extends MediatorComponent>
    extends MediatorComponent
    implements ConsumingComponent {
        protected final T delegate;
        protected final Map<String, Integer> concurrency;

        protected ConcurrentComponent(MediatorManager manager, MediatorConfiguration configuration, T mediator, Map<String, Integer> incomingConcurrency) {
            super(manager, configuration);
            this.delegate = mediator;
            this.concurrency = incomingConcurrency;
        }

        @Override
        public Set<Component> upstreams() {
            return this.delegate.upstreams();
        }

        @Override
        public Set<Component> downstreams() {
            return this.delegate.downstreams();
        }

        @Override
        public List<String> incomings() {
            return this.delegate.incomings();
        }

        @Override
        public List<String> outgoings() {
            return this.delegate.outgoings();
        }

        @Override
        public void validate() throws WiringException {
            this.delegate.validate();
        }

        @Override
        public boolean isDownstreamResolved() {
            return this.delegate.isDownstreamResolved();
        }

        @Override
        public boolean isUpstreamResolved() {
            return this.delegate.isUpstreamResolved();
        }

        @Override
        public void terminate() {
            this.delegate.terminate();
        }

        @Override
        public boolean merge() {
            return this.configuration.getMerge() != null;
        }
    }

    static interface NoDownstreamComponent
    extends Component {
        @Override
        default public boolean isDownstreamResolved() {
            return true;
        }
    }

    static interface NoUpstreamComponent
    extends Component {
        @Override
        default public boolean isUpstreamResolved() {
            return true;
        }
    }

    public static interface PublishingComponent
    extends Component {
        public boolean broadcast();

        public int getRequiredNumberOfSubscribers();

        default public String getOutgoingChannel() {
            return String.join((CharSequence)",", this.outgoings());
        }

        @Override
        default public boolean isDownstreamResolved() {
            return this.outgoings().stream().allMatch(o -> this.downstreams().stream().anyMatch(c -> c.incomings().contains(o)));
        }

        @Override
        default public void connectDownstream(Component downstream) {
            this.downstreams().add(downstream);
        }
    }
}

