/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spring.data.firestore.util;

import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public final class ObservableReactiveUtil {
    private ObservableReactiveUtil() {
    }

    public static <R> Mono<R> unaryCall(Consumer<StreamObserver<R>> remoteCall) {
        return Mono.create(sink -> remoteCall.accept(new UnaryStreamObserver((MonoSink)sink)));
    }

    public static <R> Flux<R> streamingCall(Consumer<StreamObserver<R>> remoteCall) {
        return Flux.create(sink -> {
            StreamingObserver observer = new StreamingObserver(sink);
            remoteCall.accept((StreamObserver)observer);
            sink.onRequest(observer::request);
        });
    }

    private static class UnaryStreamObserver<R>
    implements StreamObserver<R> {
        private boolean terminalEventReceived;
        private final MonoSink sink;

        UnaryStreamObserver(MonoSink sink) {
            this.sink = sink;
        }

        public void onNext(R response) {
            this.terminalEventReceived = true;
            this.sink.success(response);
        }

        public void onError(Throwable throwable) {
            this.terminalEventReceived = true;
            this.sink.error(throwable);
        }

        public void onCompleted() {
            if (!this.terminalEventReceived) {
                this.sink.error((Throwable)new RuntimeException("Unary gRPC call completed without yielding a value or an error"));
            }
        }
    }

    static class StreamingObserver<Q, R>
    implements ClientResponseObserver<Q, R> {
        ClientCallStreamObserver<Q> rsObserver;
        FluxSink<R> sink;

        StreamingObserver(FluxSink<R> sink) {
            this.sink = sink;
        }

        public void onNext(R value) {
            this.sink.next(value);
        }

        public void onError(Throwable throwable) {
            this.sink.error(throwable);
        }

        public void onCompleted() {
            this.sink.complete();
        }

        public void beforeStart(ClientCallStreamObserver<Q> requestStream) {
            this.rsObserver = requestStream;
            requestStream.disableAutoInboundFlowControl();
            this.sink.onCancel(() -> requestStream.cancel("Flux requested cancel.", null));
        }

        void request(long n) {
            this.rsObserver.request(n > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)n);
        }
    }
}

