/*
 * Decompiled with CFR 0.152.
 */
package grpcstarter.extensions.transcoding;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import grpcstarter.extensions.transcoding.GrpcTranscodingProperties;
import grpcstarter.extensions.transcoding.HeaderConverter;
import grpcstarter.extensions.transcoding.JsonUtil;
import grpcstarter.extensions.transcoding.ReactiveTranscoder;
import grpcstarter.extensions.transcoding.ReactiveTranscodingExceptionResolver;
import grpcstarter.extensions.transcoding.Transcoder;
import grpcstarter.extensions.transcoding.TranscodingRuntimeException;
import grpcstarter.extensions.transcoding.TranscodingUtil;
import grpcstarter.extensions.transcoding.Util;
import grpcstarter.server.GrpcServerProperties;
import grpcstarter.server.GrpcServerStartedEvent;
import io.grpc.BindableService;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.server.HandlerFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class DefaultReactiveTranscoder
implements ReactiveTranscoder,
ApplicationListener<GrpcServerStartedEvent>,
DisposableBean {
    private static final String MATCHING_ROUTE = DefaultReactiveTranscoder.class.getName() + ".matchingRoute";
    private final Map<String, Util.Route<ServerRequest>> autoMappingRoutes = new HashMap<String, Util.Route<ServerRequest>>();
    private final List<Util.Route<ServerRequest>> customRoutes = new ArrayList<Util.Route<ServerRequest>>();
    private final HeaderConverter headerConverter;
    private final GrpcTranscodingProperties grpcTranscodingProperties;
    private final GrpcServerProperties grpcServerProperties;
    private final ReactiveTranscodingExceptionResolver transcodingExceptionResolver;
    private Channel channel;

    public DefaultReactiveTranscoder(List<BindableService> services, HeaderConverter headerConverter, GrpcTranscodingProperties grpcTranscodingProperties, GrpcServerProperties grpcServerProperties, ReactiveTranscodingExceptionResolver transcodingExceptionResolver) {
        Util.getReactiveRoutes(services, this.autoMappingRoutes, this.customRoutes, grpcTranscodingProperties);
        this.headerConverter = headerConverter;
        this.grpcTranscodingProperties = grpcTranscodingProperties;
        this.grpcServerProperties = grpcServerProperties;
        this.transcodingExceptionResolver = transcodingExceptionResolver;
    }

    public void onApplicationEvent(GrpcServerStartedEvent event) {
        this.channel = Util.getTranscodingChannel(event.getSource().getPort(), this.grpcTranscodingProperties, this.grpcServerProperties);
    }

    @Nonnull
    public Mono<HandlerFunction<ServerResponse>> route(@Nonnull ServerRequest request) {
        Util.Route<ServerRequest> route;
        if (Objects.equals(request.method(), HttpMethod.POST) && (route = this.autoMappingRoutes.get(Util.trim(request.path(), '/'))) != null) {
            request.attributes().put(MATCHING_ROUTE, route);
            return Mono.just((Object)this);
        }
        for (Util.Route<ServerRequest> route2 : this.customRoutes) {
            if (!route2.predicate().test(request) && !route2.additionalPredicates().stream().anyMatch(p -> p.test(request))) continue;
            request.attributes().put(MATCHING_ROUTE, route2);
            return Mono.just((Object)this);
        }
        return Mono.empty();
    }

    @Nonnull
    public Mono<ServerResponse> handle(@Nonnull ServerRequest request) {
        Util.Route route = (Util.Route)request.attributes().get(MATCHING_ROUTE);
        MethodDescriptor.MethodType methodType = route.invokeMethod().getType();
        if (methodType == MethodDescriptor.MethodType.UNARY) {
            return this.processUnaryCall(request, route);
        }
        if (methodType == MethodDescriptor.MethodType.SERVER_STREAMING) {
            return this.processServerStreamingCall(request, route);
        }
        throw new ResponseStatusException((HttpStatusCode)HttpStatus.BAD_REQUEST, "Unsupported rpc method type: " + String.valueOf(methodType));
    }

    private static ClientCall<Object, Object> getCall(Channel channel, Util.Route<ServerRequest> route) {
        return channel.newCall(route.invokeMethod(), CallOptions.DEFAULT);
    }

    private Mono<ServerResponse> processServerStreamingCall(ServerRequest request, final Util.Route<ServerRequest> route) {
        return request.bodyToMono(DataBuffer.class).defaultIfEmpty((Object)request.exchange().getResponse().bufferFactory().wrap(new byte[0])).flatMap(buf -> {
            final Transcoder transcoder = DefaultReactiveTranscoder.getTranscoder(request, buf);
            Message msg = DefaultReactiveTranscoder.getMessage(route, transcoder);
            Channel chan = ClientInterceptors.intercept((Channel)this.channel, (ClientInterceptor[])new ClientInterceptor[]{MetadataUtils.newAttachHeadersInterceptor((Metadata)this.headerConverter.toMetadata(request.headers().asHttpHeaders()))});
            ClientCall<Object, Object> call = DefaultReactiveTranscoder.getCall(chan, route);
            Flux response = Flux.create(sink -> ClientCalls.asyncServerStreamingCall((ClientCall)call, (Object)msg, (StreamObserver)new StreamObserver<Object>(){

                public void onNext(Object o) {
                    String json = JsonUtil.toJson(transcoder.out((Message)o, route.httpRule()));
                    sink.next((Object)ServerSentEvent.builder().data((Object)json).build());
                }

                public void onError(Throwable throwable) {
                    if (throwable instanceof StatusRuntimeException) {
                        StatusRuntimeException sre = (StatusRuntimeException)throwable;
                        sink.error((Throwable)((Object)new TranscodingRuntimeException((HttpStatusCode)TranscodingUtil.toHttpStatus(sre.getStatus()), sre.getMessage(), null)));
                    } else {
                        sink.error(throwable);
                    }
                }

                public void onCompleted() {
                    sink.complete();
                }
            }));
            return ServerResponse.ok().body((Object)response, ServerSentEvent.class);
        });
    }

    private static Transcoder getTranscoder(ServerRequest request, DataBuffer buf) {
        return Transcoder.create(new Transcoder.Variable(DefaultReactiveTranscoder.getBytes(buf), DefaultReactiveTranscoder.convert((Map<String, List<String>>)request.queryParams()), (Map)request.exchange().getAttribute(Util.URI_TEMPLATE_VARIABLES_ATTRIBUTE)));
    }

    private Mono<ServerResponse> processUnaryCall(ServerRequest request, final Util.Route<ServerRequest> route) {
        return request.bodyToMono(DataBuffer.class).defaultIfEmpty((Object)request.exchange().getResponse().bufferFactory().wrap(new byte[0])).flatMap(buf -> {
            final Transcoder transcoder = DefaultReactiveTranscoder.getTranscoder(request, buf);
            Message msg = DefaultReactiveTranscoder.getMessage(route, transcoder);
            final AtomicReference headers = new AtomicReference();
            AtomicReference trailers = new AtomicReference();
            Channel chan = ClientInterceptors.intercept((Channel)this.channel, (ClientInterceptor[])new ClientInterceptor[]{MetadataUtils.newCaptureMetadataInterceptor(headers, trailers), MetadataUtils.newAttachHeadersInterceptor((Metadata)this.headerConverter.toMetadata(request.headers().asHttpHeaders()))});
            ClientCall<Object, Object> call = DefaultReactiveTranscoder.getCall(chan, route);
            return Mono.create(sink -> ClientCalls.asyncUnaryCall((ClientCall)call, (Object)msg, (StreamObserver)new StreamObserver<Object>(){

                public void onNext(Object o) {
                    ServerResponse.BodyBuilder builder = (ServerResponse.BodyBuilder)ServerResponse.ok().headers(h -> {
                        Metadata m = (Metadata)headers.get();
                        if (m != null) {
                            h.addAll((MultiValueMap)DefaultReactiveTranscoder.this.headerConverter.toHttpHeaders(m));
                        }
                    });
                    Object body = transcoder.out((Message)o, route.httpRule());
                    if (JsonUtil.canParseJson(body)) {
                        builder.contentType(MediaType.APPLICATION_JSON);
                    }
                    builder.body((Publisher)Mono.just((Object)JsonUtil.toJson(body)), String.class).subscribe(arg_0 -> ((MonoSink)sink).success(arg_0), arg_0 -> ((MonoSink)sink).error(arg_0));
                }

                public void onError(Throwable throwable) {
                    if (throwable instanceof StatusRuntimeException) {
                        StatusRuntimeException sre = (StatusRuntimeException)throwable;
                        DefaultReactiveTranscoder.this.transcodingExceptionResolver.resolve((MonoSink<ServerResponse>)sink, sre);
                    } else {
                        sink.error(throwable);
                    }
                }

                public void onCompleted() {
                    sink.success();
                }
            }));
        });
    }

    private static Message getMessage(Util.Route<?> route, Transcoder transcoder) {
        try {
            return Util.buildRequestMessage(transcoder, route);
        }
        catch (InvalidProtocolBufferException e) {
            throw new ResponseStatusException((HttpStatusCode)HttpStatus.BAD_REQUEST, e.getMessage(), (Throwable)e);
        }
    }

    private static byte[] getBytes(DataBuffer buf) {
        byte[] byArray;
        block8: {
            InputStream is = buf.asInputStream(true);
            try {
                byArray = is.readAllBytes();
                if (is == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (is != null) {
                        try {
                            is.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    throw new IllegalStateException("Failed to read DataBuffer", e);
                }
            }
            is.close();
        }
        return byArray;
    }

    private static Map<String, String[]> convert(Map<String, List<String>> map) {
        return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> (String[])((List)e.getValue()).toArray(String[]::new)));
    }

    public void destroy() throws Exception {
        Util.shutdown(this.channel, Duration.ofSeconds(15L));
    }
}

