/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.impl;

import io.smallrye.reactive.messaging.StreamRegistar;
import io.smallrye.reactive.messaging.StreamRegistry;
import io.smallrye.reactive.messaging.impl.ConnectorConfig;
import io.smallrye.reactive.messaging.spi.IncomingConnectorFactory;
import io.smallrye.reactive.messaging.spi.OutgoingConnectorFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class ConfiguredStreamFactory
implements StreamRegistar {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConfiguredStreamFactory.class);
    private static final String SOURCE_CONFIG_PREFIX = "mp.messaging.incoming";
    private static final String SINK_CONFIG_PREFIX = "mp.messaging.outgoing";
    private final List<IncomingConnectorFactory> sourceFactories;
    private final List<OutgoingConnectorFactory> sinkFactories;
    private final Config config;
    private final StreamRegistry registry;

    ConfiguredStreamFactory() {
        this.sourceFactories = null;
        this.sinkFactories = null;
        this.config = null;
        this.registry = null;
    }

    @Inject
    public ConfiguredStreamFactory(@Any Instance<IncomingConnectorFactory> sourceFactories, @Any Instance<OutgoingConnectorFactory> sinkFactories, Instance<Config> config, @Any Instance<StreamRegistry> registry) {
        this.registry = (StreamRegistry)registry.get();
        if (config.isUnsatisfied()) {
            this.sourceFactories = Collections.emptyList();
            this.sinkFactories = Collections.emptyList();
            this.config = null;
        } else {
            this.sourceFactories = sourceFactories.stream().collect(Collectors.toList());
            this.sinkFactories = sinkFactories.stream().collect(Collectors.toList());
            LOGGER.info("Found incoming connectors: {}", sourceFactories.stream().map(IncomingConnectorFactory::type).collect(Collectors.toList()));
            LOGGER.info("Found outgoing connectors: {}", sinkFactories.stream().map(OutgoingConnectorFactory::type).collect(Collectors.toList()));
            this.config = (Config)config.stream().findFirst().orElseThrow(() -> new IllegalStateException("Unable to retrieve the config"));
        }
    }

    static Map<String, ConnectorConfig> extractConfigurationFor(String prefix, Config root) {
        Iterable names = root.getPropertyNames();
        HashMap<String, ConnectorConfig> configs = new HashMap<String, ConnectorConfig>();
        names.forEach(key -> {
            if (key.startsWith(prefix)) {
                String name = key.substring(prefix.length() + 1);
                if (name.contains(".")) {
                    String tmp = name;
                    name = tmp.substring(0, tmp.indexOf(46));
                    configs.put(name, new ConnectorConfig(prefix + "." + name, root, name));
                } else {
                    configs.put(name, new ConnectorConfig(prefix + "." + name, root, name));
                }
            }
        });
        return configs;
    }

    @Override
    public void initialize() {
        if (this.config == null) {
            LOGGER.info("No MicroProfile Config found, skipping");
            return;
        }
        LOGGER.info("Stream manager initializing...");
        Map<String, ConnectorConfig> sourceConfiguration = ConfiguredStreamFactory.extractConfigurationFor(SOURCE_CONFIG_PREFIX, this.config);
        Map<String, ConnectorConfig> sinkConfiguration = ConfiguredStreamFactory.extractConfigurationFor(SINK_CONFIG_PREFIX, this.config);
        try {
            sourceConfiguration.forEach((name, conf) -> this.registry.register((String)name, this.createPublisherBuilder((String)name, (Config)conf)));
            sinkConfiguration.forEach((name, conf) -> this.registry.register((String)name, this.createSubscriberBuilder((String)name, (Config)conf)));
        }
        catch (Exception e) {
            LOGGER.error("Unable to create the publisher or subscriber during initialization", (Throwable)e);
        }
    }

    private PublisherBuilder<? extends Message> createPublisherBuilder(String name, Config config) {
        String type = (String)config.getOptionalValue("type", String.class).orElseThrow(() -> new IllegalArgumentException("Invalid source, no type for " + name));
        IncomingConnectorFactory mySourceFactory = this.sourceFactories.stream().filter(factory -> factory.type().getName().equalsIgnoreCase(type)).findFirst().orElseThrow(() -> new IllegalArgumentException("Unknown source type for " + name + ", supported types are " + this.sourceFactories.stream().map(sf -> sf.type().getName()).collect(Collectors.toList())));
        return mySourceFactory.getPublisherBuilder(config);
    }

    private SubscriberBuilder<? extends Message, Void> createSubscriberBuilder(String name, Config config) {
        String type = (String)config.getOptionalValue("type", String.class).orElseThrow(() -> new IllegalArgumentException("Invalid sink, no type for " + name));
        OutgoingConnectorFactory mySinkFactory = this.sinkFactories.stream().filter(factory -> factory.type().getName().equalsIgnoreCase(type)).findFirst().orElseThrow(() -> new IllegalArgumentException("Unknown sink type for " + name + ", supported types are " + this.sinkFactories.stream().map(sf -> sf.type().getName()).collect(Collectors.toList())));
        return mySinkFactory.getSubscriberBuilder(config);
    }
}

