/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.camel;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.camel.CamelConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.camel.CamelConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.camel.CamelFailStop;
import io.smallrye.reactive.messaging.camel.CamelFailureHandler;
import io.smallrye.reactive.messaging.camel.CamelIgnoreFailure;
import io.smallrye.reactive.messaging.camel.CamelMessage;
import io.smallrye.reactive.messaging.camel.IncomingExchangeMetadata;
import io.smallrye.reactive.messaging.camel.OutgoingExchangeMetadata;
import io.smallrye.reactive.messaging.camel.i18n.CamelExceptions;
import io.smallrye.reactive.messaging.camel.i18n.CamelLogging;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import java.util.concurrent.Flow;
import mutiny.zero.flow.adapters.AdaptersToFlow;
import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsServiceFactory;
import org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultExchange;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

@ApplicationScoped
@Connector(value="smallrye-camel")
@ConnectorAttributes(value={@ConnectorAttribute(name="endpoint-uri", description="The URI of the Camel endpoint (read from or written to)", mandatory=true, type="string", direction=ConnectorAttribute.Direction.INCOMING_AND_OUTGOING), @ConnectorAttribute(name="failure-strategy", type="string", direction=ConnectorAttribute.Direction.INCOMING, description="Specify the failure strategy to apply when a message produced from a Camel exchange is nacked. Values can be `fail` (default) or `ignore`", defaultValue="fail"), @ConnectorAttribute(name="merge", direction=ConnectorAttribute.Direction.OUTGOING, description="Whether the connector should allow multiple upstreams", type="boolean", defaultValue="false")})
public class CamelConnector
implements InboundConnector,
OutboundConnector {
    private static final String REACTIVE_STREAMS_SCHEME = "reactive-streams:";
    public static final String CONNECTOR_NAME = "smallrye-camel";
    @Inject
    private CamelContext camel;
    private CamelReactiveStreamsService reactive;

    @Produces
    @ApplicationScoped
    public synchronized CamelReactiveStreamsService getCamelReactive() {
        if (this.reactive != null) {
            return this.reactive;
        }
        CamelReactiveStreamsService service = (CamelReactiveStreamsService)this.camel.hasService(CamelReactiveStreamsService.class);
        if (service != null) {
            CamelLogging.log.camelReactiveStreamsServiceAlreadyDefined();
            this.reactive = service;
            return service;
        }
        DefaultCamelReactiveStreamsServiceFactory factory = new DefaultCamelReactiveStreamsServiceFactory();
        ReactiveStreamsEngineConfiguration configuration = new ReactiveStreamsEngineConfiguration();
        Config config = ConfigProvider.getConfig();
        config.getOptionalValue("camel.component.reactive-streams.internal-engine-configuration.thread-pool-max-size", Integer.class).ifPresent(arg_0 -> ((ReactiveStreamsEngineConfiguration)configuration).setThreadPoolMaxSize(arg_0));
        config.getOptionalValue("camel.component.reactive-streams.internal-engine-configuration.thread-pool-min-size", Integer.class).ifPresent(arg_0 -> ((ReactiveStreamsEngineConfiguration)configuration).setThreadPoolMinSize(arg_0));
        config.getOptionalValue("camel.component.reactive-streams.internal-engine-configuration.thread-pool-name", String.class).ifPresent(arg_0 -> ((ReactiveStreamsEngineConfiguration)configuration).setThreadPoolName(arg_0));
        this.reactive = factory.newInstance(this.camel, configuration);
        try {
            this.camel.addService((Object)this.reactive, true, true);
        }
        catch (Exception e) {
            throw CamelExceptions.ex.unableToRegisterService(e);
        }
        return this.reactive;
    }

    public Flow.Publisher<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> getPublisher(Config config) {
        Flow.Publisher publisher;
        CamelConnectorIncomingConfiguration ic = new CamelConnectorIncomingConfiguration(config);
        String name = ic.getEndpointUri();
        CamelFailureHandler.Strategy strategy = CamelFailureHandler.Strategy.from(ic.getFailureStrategy());
        CamelFailureHandler onNack = this.createFailureHandler(strategy, ic.getChannel());
        if (name.startsWith(REACTIVE_STREAMS_SCHEME)) {
            name = name.substring(REACTIVE_STREAMS_SCHEME.length());
            CamelLogging.log.creatingPublisherFromStream(name);
            publisher = AdaptersToFlow.publisher((Publisher)this.getCamelReactive().fromStream(name));
        } else {
            CamelLogging.log.creatingPublisherFromEndpoint(name);
            publisher = AdaptersToFlow.publisher((Publisher)this.getCamelReactive().from(name));
        }
        return Multi.createFrom().publisher(publisher).map(ex -> new CamelMessage((Exchange)ex, onNack));
    }

    public Flow.Subscriber<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> getSubscriber(Config config) {
        Flow.Subscriber<org.eclipse.microprofile.reactive.messaging.Message<?>> subscriber;
        String name = new CamelConnectorOutgoingConfiguration(config).getEndpointUri();
        if (name.startsWith(REACTIVE_STREAMS_SCHEME)) {
            name = name.substring(REACTIVE_STREAMS_SCHEME.length());
            CamelLogging.log.creatingSubscriberFromStream(name);
            subscriber = this.getMessageSubscriber((Subscriber<Exchange>)this.getCamelReactive().streamSubscriber(name));
        } else {
            CamelLogging.log.creatingSubscriberFromEndpoint(name);
            subscriber = this.getMessageSubscriber((Subscriber<Exchange>)this.getCamelReactive().subscriber(name));
        }
        return subscriber;
    }

    private Flow.Subscriber<org.eclipse.microprofile.reactive.messaging.Message<?>> getMessageSubscriber(final Subscriber<Exchange> s) {
        return new Flow.Subscriber<org.eclipse.microprofile.reactive.messaging.Message<?>>(){

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                s.onSubscribe(AdaptersToReactiveStreams.subscription((Flow.Subscription)subscription));
            }

            @Override
            public void onNext(org.eclipse.microprofile.reactive.messaging.Message<?> item) {
                s.onNext((Object)CamelConnector.this.createExchangeFromMessage(item));
            }

            @Override
            public void onError(Throwable throwable) {
                s.onError(throwable);
            }

            @Override
            public void onComplete() {
                s.onComplete();
            }
        };
    }

    private Exchange createExchangeFromMessage(final org.eclipse.microprofile.reactive.messaging.Message<?> message) {
        if (message.getPayload() instanceof Exchange) {
            return (Exchange)message.getPayload();
        }
        OutgoingExchangeMetadata outGoingMetadata = message.getMetadata(OutgoingExchangeMetadata.class).orElse(null);
        IncomingExchangeMetadata incomingMetadata = message.getMetadata(IncomingExchangeMetadata.class).orElse(null);
        DefaultExchange exchange = new DefaultExchange(this.camel);
        if (outGoingMetadata != null) {
            outGoingMetadata.getProperties().forEach((x$0, x$1) -> exchange.setProperty(x$0, x$1));
            if (outGoingMetadata.getExchangePattern() != null) {
                exchange.setPattern(outGoingMetadata.getExchangePattern());
            }
            outGoingMetadata.getHeaders().forEach((arg_0, arg_1) -> ((Message)exchange.getIn()).setHeader(arg_0, arg_1));
        }
        if (incomingMetadata != null && incomingMetadata.getExchange().getIn() != null && incomingMetadata.getExchange().getIn().getHeaders() != null) {
            incomingMetadata.getExchange().getIn().getHeaders().forEach((arg_0, arg_1) -> ((Message)exchange.getIn()).setHeader(arg_0, arg_1));
        }
        exchange.getIn().setBody(message.getPayload());
        exchange.getExchangeExtension().addOnCompletion(new Synchronization(){

            public void onComplete(Exchange exchange) {
                message.ack();
            }

            public void onFailure(Exchange exchange) {
                CamelLogging.log.exchangeFailed(exchange.getException());
                message.nack((Throwable)exchange.getException());
            }
        });
        return exchange;
    }

    private CamelFailureHandler createFailureHandler(CamelFailureHandler.Strategy strategy, String channel) {
        switch (strategy) {
            case IGNORE: {
                return new CamelIgnoreFailure(channel);
            }
            case FAIL: {
                return new CamelFailStop(channel);
            }
        }
        throw CamelExceptions.ex.illegalArgumentUnknownStrategy(strategy);
    }
}

