/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.resteasy.reactive.server.handlers;

import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.sse.OutboundSseEvent;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Flow;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import mutiny.zero.flow.adapters.AdaptersToFlow;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.common.util.RestMediaType;
import org.jboss.resteasy.reactive.common.util.ServerMediaType;
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.core.SseUtil;
import org.jboss.resteasy.reactive.server.core.StreamingUtil;
import org.jboss.resteasy.reactive.server.jaxrs.OutboundSseEventImpl;
import org.jboss.resteasy.reactive.server.jaxrs.SseEventSinkImpl;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;
import org.jboss.resteasy.reactive.server.spi.StreamingResponse;
import org.reactivestreams.Publisher;

public class PublisherResponseHandler
implements ServerRestHandler {
    private static final String JSON = "json";
    private List<StreamingResponseCustomizer> streamingResponseCustomizers = Collections.emptyList();
    private static final Logger log = Logger.getLogger(PublisherResponseHandler.class);
    private static final ServerRestHandler[] AWOL = new ServerRestHandler[]{new ServerRestHandler(){

        @Override
        public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
            throw new IllegalStateException("FAILURE: should never be restarted");
        }
    }};

    public void setStreamingResponseCustomizers(List<StreamingResponseCustomizer> streamingResponseCustomizers) {
        this.streamingResponseCustomizers = streamingResponseCustomizers;
    }

    @Override
    public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
        Object requestContextResult = requestContext.getResult();
        if (requestContextResult instanceof Publisher) {
            requestContextResult = AdaptersToFlow.publisher((Publisher)((Publisher)requestContextResult));
        }
        if (requestContextResult instanceof Flow.Publisher) {
            Flow.Publisher result = (Flow.Publisher)requestContextResult;
            ServerMediaType produces = requestContext.getTarget().getProduces();
            if (produces == null) {
                throw new IllegalStateException("Negotiation or dynamic media type not supported yet for Multi: please use the @Produces annotation when returning a Multi");
            }
            MediaType[] mediaTypes = produces.getSortedOriginalMediaTypes();
            if (mediaTypes.length != 1) {
                throw new IllegalStateException("Negotiation or dynamic media type not supported yet for Multi: please use a single @Produces annotation");
            }
            MediaType mediaType = mediaTypes[0];
            requestContext.setResponseContentType(mediaType);
            requestContext.setGenericReturnType(requestContext.getTarget().getReturnType());
            if (mediaType.isCompatible(MediaType.SERVER_SENT_EVENTS_TYPE)) {
                this.handleSse(requestContext, result);
            } else {
                requestContext.suspend();
                boolean json = mediaType.toString().contains(JSON);
                if (this.requiresChunkedStream(mediaType)) {
                    this.handleChunkedStreaming(requestContext, result, json);
                } else {
                    this.handleStreaming(requestContext, result, json);
                }
            }
        }
    }

    private boolean requiresChunkedStream(MediaType mediaType) {
        return mediaType.isCompatible((MediaType)RestMediaType.APPLICATION_NDJSON_TYPE) || mediaType.isCompatible((MediaType)RestMediaType.APPLICATION_STREAM_JSON_TYPE);
    }

    private void handleChunkedStreaming(ResteasyReactiveRequestContext requestContext, Flow.Publisher<?> result, boolean json) {
        result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, this.streamingResponseCustomizers, json));
    }

    private void handleStreaming(ResteasyReactiveRequestContext requestContext, Flow.Publisher<?> result, boolean json) {
        result.subscribe(new StreamingMultiSubscriber(requestContext, this.streamingResponseCustomizers, json));
    }

    private void handleSse(final ResteasyReactiveRequestContext requestContext, final Flow.Publisher<?> result) {
        SseUtil.setHeaders(requestContext, requestContext.serverResponse(), this.streamingResponseCustomizers);
        requestContext.suspend();
        requestContext.serverResponse().write(SseEventSinkImpl.EMPTY_BUFFER, new Consumer<Throwable>(){

            @Override
            public void accept(Throwable throwable) {
                if (throwable == null) {
                    result.subscribe(new SseMultiSubscriber(requestContext, PublisherResponseHandler.this.streamingResponseCustomizers));
                } else {
                    requestContext.resume(throwable);
                }
            }
        });
    }

    public static interface StreamingResponseCustomizer {
        public void customize(StreamingResponse<?> var1);

        public static class AddHeadersCustomizer
        implements StreamingResponseCustomizer {
            private Map<String, List<String>> headers;

            public AddHeadersCustomizer(Map<String, List<String>> headers) {
                this.headers = headers;
            }

            public AddHeadersCustomizer() {
            }

            public Map<String, List<String>> getHeaders() {
                return this.headers;
            }

            public void setHeaders(Map<String, List<String>> headers) {
                this.headers = headers;
            }

            @Override
            public void customize(StreamingResponse<?> streamingResponse) {
                for (Map.Entry<String, List<String>> entry : this.headers.entrySet()) {
                    streamingResponse.setResponseHeader((CharSequence)entry.getKey(), (Iterable<CharSequence>)entry.getValue());
                }
            }
        }

        public static class StatusCustomizer
        implements StreamingResponseCustomizer {
            private int status;

            public StatusCustomizer(int status) {
                this.status = status;
            }

            public StatusCustomizer() {
            }

            public int getStatus() {
                return this.status;
            }

            public void setStatus(int status) {
                this.status = status;
            }

            @Override
            public void customize(StreamingResponse<?> streamingResponse) {
                streamingResponse.setStatusCode(this.status);
            }
        }
    }

    static abstract class AbstractMultiSubscriber
    implements Flow.Subscriber<Object> {
        protected Flow.Subscription subscription;
        protected ResteasyReactiveRequestContext requestContext;
        protected List<StreamingResponseCustomizer> customizers;
        private boolean weClosed = false;

        AbstractMultiSubscriber(ResteasyReactiveRequestContext requestContext, List<StreamingResponseCustomizer> customizers) {
            this.requestContext = requestContext;
            this.customizers = customizers;
            requestContext.restart(AWOL, true);
            requestContext.serverResponse().addCloseHandler(() -> {
                if (!this.weClosed && this.subscription != null) {
                    this.subscription.cancel();
                }
            });
        }

        @Override
        public void onSubscribe(Flow.Subscription s) {
            this.subscription = s;
            s.request(1L);
        }

        @Override
        public void onComplete() {
            this.weClosed = true;
            this.requestContext.serverResponse().end();
            this.requestContext.close();
        }

        @Override
        public void onError(Throwable t) {
            this.handleException(this.requestContext, t);
        }

        protected void handleException(ResteasyReactiveRequestContext requestContext, Throwable t) {
            if (requestContext.serverResponse().headWritten()) {
                log.error((Object)"Exception in SSE server handling, impossible to send it to client", t);
            } else {
                requestContext.resume(t, true);
            }
        }
    }

    private static class StreamingMultiSubscriber
    extends AbstractMultiSubscriber {
        private boolean json;
        private String nextJsonPrefix;
        private boolean hadItem;

        StreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext, List<StreamingResponseCustomizer> customizers, boolean json) {
            super(requestContext, customizers);
            this.json = json;
            this.nextJsonPrefix = "[";
            this.hadItem = false;
        }

        @Override
        public void onNext(Object item) {
            this.hadItem = true;
            StreamingUtil.send(this.requestContext, this.customizers, item, this.messagePrefix()).handle(new BiFunction<Object, Throwable, Object>(){

                @Override
                public Object apply(Object v, Throwable t) {
                    if (t != null) {
                        try {
                            subscription.cancel();
                        }
                        catch (Throwable t2) {
                            t2.printStackTrace();
                        }
                        this.handleException(requestContext, t);
                    } else {
                        nextJsonPrefix = ",";
                        subscription.request(1L);
                    }
                    return null;
                }
            });
        }

        @Override
        public void onComplete() {
            if (!this.hadItem) {
                StreamingUtil.setHeaders(this.requestContext, this.requestContext.serverResponse(), this.customizers);
            }
            if (this.json) {
                String postfix = this.onCompleteText();
                byte[] postfixBytes = postfix.getBytes(StandardCharsets.US_ASCII);
                this.requestContext.serverResponse().write(postfixBytes).handle((v, t) -> {
                    super.onComplete();
                    return null;
                });
            } else {
                super.onComplete();
            }
        }

        protected String onCompleteText() {
            String postfix = !this.hadItem ? "[]" : "]";
            return postfix;
        }

        protected String messagePrefix() {
            return this.json ? this.nextJsonPrefix : null;
        }
    }

    private static class ChunkedStreamingMultiSubscriber
    extends StreamingMultiSubscriber {
        private static final String LINE_SEPARATOR = "\n";
        private boolean isFirstItem = true;

        ChunkedStreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext, List<StreamingResponseCustomizer> customizers, boolean json) {
            super(requestContext, customizers, json);
        }

        @Override
        protected String messagePrefix() {
            if (this.isFirstItem) {
                this.isFirstItem = false;
                return null;
            }
            return LINE_SEPARATOR;
        }

        @Override
        protected String onCompleteText() {
            return LINE_SEPARATOR;
        }
    }

    private static class SseMultiSubscriber
    extends AbstractMultiSubscriber {
        SseMultiSubscriber(ResteasyReactiveRequestContext requestContext, List<StreamingResponseCustomizer> customizers) {
            super(requestContext, customizers);
        }

        @Override
        public void onNext(Object item) {
            Object event = item instanceof OutboundSseEvent ? (OutboundSseEvent)item : new OutboundSseEventImpl.BuilderImpl().data(item).build();
            SseUtil.send(this.requestContext, event, this.customizers).whenComplete(new BiConsumer<Object, Throwable>(){

                @Override
                public void accept(Object v, Throwable t) {
                    if (t != null) {
                        subscription.cancel();
                        this.handleException(requestContext, t);
                    } else {
                        subscription.request(1L);
                    }
                }
            });
        }
    }
}

