/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.camel;

import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.camel.CamelConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.camel.CamelConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.camel.CamelFailStop;
import io.smallrye.reactive.messaging.camel.CamelFailureHandler;
import io.smallrye.reactive.messaging.camel.CamelIgnoreFailure;
import io.smallrye.reactive.messaging.camel.CamelMessage;
import io.smallrye.reactive.messaging.camel.OutgoingExchangeMetadata;
import io.smallrye.reactive.messaging.camel.i18n.CamelExceptions;
import io.smallrye.reactive.messaging.camel.i18n.CamelLogging;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsServiceFactory;
import org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultExchange;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;

@ApplicationScoped
@Connector(value="smallrye-camel")
@ConnectorAttributes(value={@ConnectorAttribute(name="endpoint-uri", description="The URI of the Camel endpoint (read from or written to)", mandatory=true, type="string", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING), @ConnectorAttribute(name="failure-strategy", type="string", direction=ConnectorAttribute.Direction.INCOMING, description="Specify the failure strategy to apply when a message produced from a Camel exchange is nacked. Values can be `fail` (default) or `ignore`", defaultValue="fail")})
public class CamelConnector
implements IncomingConnectorFactory,
OutgoingConnectorFactory {
    private static final String REACTIVE_STREAMS_SCHEME = "reactive-streams:";
    public static final String CONNECTOR_NAME = "smallrye-camel";
    @Inject
    private CamelContext camel;
    private CamelReactiveStreamsService reactive;

    @Produces
    public CamelReactiveStreamsService getCamelReactive() {
        return this.reactive;
    }

    @PostConstruct
    @Inject
    public void init() {
        DefaultCamelReactiveStreamsServiceFactory factory = new DefaultCamelReactiveStreamsServiceFactory();
        ReactiveStreamsEngineConfiguration configuration = new ReactiveStreamsEngineConfiguration();
        Config config = ConfigProvider.getConfig();
        config.getOptionalValue("camel.component.reactive-streams.internal-engine-configuration.thread-pool-max-size", Integer.class).ifPresent(arg_0 -> ((ReactiveStreamsEngineConfiguration)configuration).setThreadPoolMaxSize(arg_0));
        config.getOptionalValue("camel.component.reactive-streams.internal-engine-configuration.thread-pool-min-size", Integer.class).ifPresent(arg_0 -> ((ReactiveStreamsEngineConfiguration)configuration).setThreadPoolMinSize(arg_0));
        config.getOptionalValue("camel.component.reactive-streams.internal-engine-configuration.thread-pool-name", String.class).ifPresent(arg_0 -> ((ReactiveStreamsEngineConfiguration)configuration).setThreadPoolName(arg_0));
        this.reactive = factory.newInstance(this.camel, configuration);
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        Publisher publisher;
        CamelConnectorIncomingConfiguration ic = new CamelConnectorIncomingConfiguration(config);
        String name = ic.getEndpointUri();
        CamelFailureHandler.Strategy strategy = CamelFailureHandler.Strategy.from(ic.getFailureStrategy());
        CamelFailureHandler onNack = this.createFailureHandler(strategy, ic.getChannel());
        if (name.startsWith(REACTIVE_STREAMS_SCHEME)) {
            name = name.substring(REACTIVE_STREAMS_SCHEME.length());
            CamelLogging.log.creatingPublisherFromStream(name);
            publisher = this.reactive.fromStream(name);
        } else {
            CamelLogging.log.creatingPublisherFromEndpoint(name);
            publisher = this.reactive.from(name);
        }
        return ReactiveStreams.fromPublisher((Publisher)publisher).map(ex -> new CamelMessage((Exchange)ex, onNack));
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        SubscriberBuilder subscriber;
        String name = new CamelConnectorOutgoingConfiguration(config).getEndpointUri();
        if (name.startsWith(REACTIVE_STREAMS_SCHEME)) {
            name = name.substring(REACTIVE_STREAMS_SCHEME.length());
            CamelLogging.log.creatingSubscriberFromStream(name);
            subscriber = ReactiveStreams.builder().map(this::createExchangeFromMessage).to(this.reactive.streamSubscriber(name));
        } else {
            CamelLogging.log.creatingSubscriberFromEndpoint(name);
            subscriber = ReactiveStreams.builder().map(this::createExchangeFromMessage).to(this.reactive.subscriber(name));
        }
        return subscriber;
    }

    private Exchange createExchangeFromMessage(final Message<?> message) {
        if (message.getPayload() instanceof Exchange) {
            return (Exchange)message.getPayload();
        }
        OutgoingExchangeMetadata metadata = message.getMetadata(OutgoingExchangeMetadata.class).orElse(null);
        DefaultExchange exchange = new DefaultExchange(this.camel);
        if (metadata != null) {
            metadata.getProperties().forEach((arg_0, arg_1) -> ((DefaultExchange)exchange).setProperty(arg_0, arg_1));
            if (metadata.getExchangePattern() != null) {
                exchange.setPattern(metadata.getExchangePattern());
            }
        }
        exchange.getIn().setBody(message.getPayload());
        exchange.addOnCompletion(new Synchronization(){

            public void onComplete(Exchange exchange) {
                message.ack();
            }

            public void onFailure(Exchange exchange) {
                CamelLogging.log.exchangeFailed(exchange.getException());
                message.nack((Throwable)exchange.getException());
            }
        });
        return exchange;
    }

    private CamelFailureHandler createFailureHandler(CamelFailureHandler.Strategy strategy, String channel) {
        switch (strategy) {
            case IGNORE: {
                return new CamelIgnoreFailure(channel);
            }
            case FAIL: {
                return new CamelFailStop(channel);
            }
        }
        throw CamelExceptions.ex.illegalArgumentUnknownStrategy(strategy);
    }
}

