/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.grpc.stubs;

import io.grpc.stub.StreamObserver;
import io.quarkus.grpc.stubs.MultiStreamObserver;
import io.quarkus.grpc.stubs.UniStreamObserver;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.mutiny.subscription.UniEmitter;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

public class ClientCalls {
    private ClientCalls() {
    }

    public static <I, O> Uni<O> oneToOne(final I request, final BiConsumer<I, StreamObserver<O>> delegate) {
        return Uni.createFrom().emitter(new Consumer<UniEmitter<? super O>>(){

            @Override
            public void accept(UniEmitter<? super O> emitter) {
                delegate.accept(request, new UniStreamObserver(emitter));
            }
        });
    }

    public static <I, O> Multi<O> oneToMany(final I request, final BiConsumer<I, StreamObserver<O>> delegate) {
        return Multi.createFrom().emitter(new Consumer<MultiEmitter<? super O>>(){

            @Override
            public void accept(MultiEmitter<? super O> emitter) {
                delegate.accept(request, new MultiStreamObserver(emitter));
            }
        });
    }

    public static <I, O> Uni<O> manyToOne(final Multi<I> items, final Function<StreamObserver<O>, StreamObserver<I>> delegate) {
        return Uni.createFrom().emitter(new Consumer<UniEmitter<? super O>>(){

            @Override
            public void accept(UniEmitter<? super O> emitter) {
                AtomicReference<Flow.Subscription> cancellable = new AtomicReference<Flow.Subscription>();
                UniStreamObserver observer = new UniStreamObserver(emitter.onTermination(() -> {
                    Flow.Subscription subscription = cancellable.getAndSet(Subscriptions.CANCELLED);
                    if (subscription != null) {
                        subscription.cancel();
                    }
                }));
                StreamObserver request = (StreamObserver)delegate.apply(observer);
                ClientCalls.subscribeToUpstreamAndForwardToStreamObserver(items, cancellable, request);
            }
        });
    }

    private static <I> void subscribeToUpstreamAndForwardToStreamObserver(Multi<I> items, final AtomicReference<Flow.Subscription> cancellable, final StreamObserver<I> request) {
        items.subscribe().with(new Consumer<Flow.Subscription>(){

            @Override
            public void accept(Flow.Subscription subscription) {
                if (!cancellable.compareAndSet(null, subscription)) {
                    subscription.cancel();
                } else {
                    subscription.request(Long.MAX_VALUE);
                }
            }
        }, new Consumer<I>(){

            @Override
            public void accept(I v) {
                if (cancellable.get() != null && cancellable.get() != Subscriptions.CANCELLED) {
                    request.onNext(v);
                }
            }
        }, new Consumer<Throwable>(){

            @Override
            public void accept(Throwable throwable) {
                request.onError(throwable);
            }
        }, new Runnable(){

            @Override
            public void run() {
                request.onCompleted();
            }
        });
    }

    public static <I, O> Multi<O> manyToMany(final Multi<I> items, final Function<StreamObserver<O>, StreamObserver<I>> delegate) {
        return Multi.createFrom().emitter(new Consumer<MultiEmitter<? super O>>(){

            @Override
            public void accept(MultiEmitter<? super O> emitter) {
                AtomicReference<Flow.Subscription> cancellable = new AtomicReference<Flow.Subscription>();
                StreamObserver request = (StreamObserver)delegate.apply(new MultiStreamObserver(emitter.onTermination(() -> {
                    Flow.Subscription subscription = cancellable.getAndSet(Subscriptions.CANCELLED);
                    if (subscription != null) {
                        subscription.cancel();
                    }
                })));
                ClientCalls.subscribeToUpstreamAndForwardToStreamObserver(items, cancellable, request);
            }
        });
    }
}

