/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.memory;

import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import io.smallrye.reactive.messaging.memory.i18n.InMemoryExceptions;
import jakarta.enterprise.context.ApplicationScoped;
import java.lang.invoke.LambdaMetafactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;

@ApplicationScoped
@Connector(value="smallrye-in-memory")
public class InMemoryConnector
implements IncomingConnectorFactory,
OutgoingConnectorFactory {
    public static final String CONNECTOR = "smallrye-in-memory";
    private final Map<String, InMemorySourceImpl<?>> sources = new HashMap();
    private final Map<String, InMemorySinkImpl<?>> sinks = new HashMap();

    public static Map<String, String> switchIncomingChannelsToInMemory(String ... channels) {
        LinkedHashMap<String, String> properties = new LinkedHashMap<String, String>();
        for (String channel : channels) {
            if (channel == null || channel.trim().isEmpty()) {
                throw InMemoryExceptions.ex.illegalArgumentChannelNameNull();
            }
            String key = "mp.messaging.incoming." + channel + ".connector";
            properties.put(key, CONNECTOR);
            System.setProperty(key, CONNECTOR);
        }
        return properties;
    }

    public static Map<String, String> switchOutgoingChannelsToInMemory(String ... channels) {
        LinkedHashMap<String, String> properties = new LinkedHashMap<String, String>();
        for (String channel : channels) {
            if (channel == null || channel.trim().isEmpty()) {
                throw InMemoryExceptions.ex.illegalArgumentChannelNameNull();
            }
            String key = "mp.messaging.outgoing." + channel + ".connector";
            properties.put(key, CONNECTOR);
            System.setProperty(key, CONNECTOR);
        }
        return properties;
    }

    public static void clear() {
        List<String> list = System.getProperties().entrySet().stream().filter(entry -> CONNECTOR.equals(entry.getValue())).map(entry -> (String)entry.getKey()).collect(Collectors.toList());
        list.forEach(System::clearProperty);
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        String name = (String)config.getOptionalValue("channel-name", String.class).orElseThrow(InMemoryExceptions.ex::illegalArgumentInvalidIncomingConfig);
        boolean broadcast = config.getOptionalValue("broadcast", Boolean.class).orElse(false);
        return this.sources.computeIfAbsent((String)name, (Function<String, InMemorySourceImpl>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$getPublisherBuilder$2(boolean java.lang.String ), (Ljava/lang/String;)Lio/smallrye/reactive/messaging/memory/InMemoryConnector$InMemorySourceImpl;)((boolean)broadcast)).source;
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        String name = (String)config.getOptionalValue("channel-name", String.class).orElseThrow(InMemoryExceptions.ex::illegalArgumentInvalidOutgoingConfig);
        return this.sinks.computeIfAbsent((String)name, (Function<String, InMemorySinkImpl>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$getSubscriberBuilder$3(java.lang.String ), (Ljava/lang/String;)Lio/smallrye/reactive/messaging/memory/InMemoryConnector$InMemorySinkImpl;)()).sink;
    }

    public <T> InMemorySource<T> source(String channel) {
        if (channel == null) {
            throw InMemoryExceptions.ex.illegalArgumentChannelMustNotBeNull();
        }
        InMemorySourceImpl<?> source = this.sources.get(channel);
        if (source == null) {
            throw InMemoryExceptions.ex.illegalArgumentUnknownChannel(channel);
        }
        return source;
    }

    public <T> InMemorySink<T> sink(String channel) {
        if (channel == null) {
            throw InMemoryExceptions.ex.illegalArgumentChannelMustNotBeNull();
        }
        InMemorySink sink = this.sinks.get(channel);
        if (sink == null) {
            throw InMemoryExceptions.ex.illegalArgumentUnknownChannel(channel);
        }
        return sink;
    }

    private static /* synthetic */ InMemorySinkImpl lambda$getSubscriberBuilder$3(String x$0) {
        return new InMemorySinkImpl(x$0);
    }

    private static /* synthetic */ InMemorySourceImpl lambda$getPublisherBuilder$2(boolean broadcast, String n) {
        return new InMemorySourceImpl(n, broadcast);
    }

    private static class InMemorySinkImpl<T>
    implements InMemorySink<T> {
        private final SubscriberBuilder<? extends Message<T>, Void> sink;
        private final List<Message<T>> list = new CopyOnWriteArrayList<Message<T>>();
        private final AtomicReference<Throwable> failure = new AtomicReference();
        private final AtomicBoolean completed = new AtomicBoolean();
        private final String name;

        private InMemorySinkImpl(String name) {
            this.name = name;
            this.sink = ReactiveStreams.builder().flatMapCompletionStage(m -> {
                this.list.add((Message<T>)m);
                return m.ack().thenApply(x -> m);
            }).onError(err -> this.failure.compareAndSet((Throwable)null, (Throwable)err)).onComplete(() -> this.completed.compareAndSet(false, true)).ignore();
        }

        @Override
        public String name() {
            return this.name;
        }

        @Override
        public List<? extends Message<T>> received() {
            return new ArrayList<Message<T>>(this.list);
        }

        @Override
        public void clear() {
            this.completed.set(false);
            this.failure.set(null);
            this.list.clear();
        }

        @Override
        public boolean hasCompleted() {
            return this.completed.get();
        }

        @Override
        public boolean hasFailed() {
            return this.getFailure() != null;
        }

        @Override
        public Throwable getFailure() {
            return this.failure.get();
        }
    }

    private static class InMemorySourceImpl<T>
    implements InMemorySource<T> {
        private final Flow.Processor<Message<T>, Message<T>> processor;
        private final PublisherBuilder<? extends Message<T>> source;
        private final String name;

        private InMemorySourceImpl(String name, boolean broadcast) {
            this.name = name;
            this.processor = broadcast ? BroadcastProcessor.create() : UnicastProcessor.create();
            this.source = ReactiveStreams.fromPublisher((Publisher)AdaptersToReactiveStreams.publisher(this.processor));
        }

        @Override
        public String name() {
            return this.name;
        }

        @Override
        public InMemorySource<T> send(T messageOrPayload) {
            if (messageOrPayload instanceof Message) {
                this.processor.onNext((Message)messageOrPayload);
            } else {
                this.processor.onNext(Message.of(messageOrPayload));
            }
            return this;
        }

        @Override
        public void complete() {
            this.processor.onComplete();
        }

        @Override
        public void fail(Throwable failure) {
            this.processor.onError(failure);
        }
    }
}

