/*
 * Decompiled with CFR 0.152.
 */
package ratpack.sse;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.ReferenceCountUtil;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.api.Nullable;
import ratpack.exec.Execution;
import ratpack.exec.internal.DefaultExecution;
import ratpack.func.Action;
import ratpack.handling.Context;
import ratpack.http.Response;
import ratpack.http.internal.HttpHeaderConstants;
import ratpack.render.Renderable;
import ratpack.sse.Event;
import ratpack.sse.ServerSentEvent;
import ratpack.sse.ServerSentEventsBuilder;
import ratpack.sse.internal.DefaultEvent;
import ratpack.sse.internal.ServerSentEventEncoder;
import ratpack.sse.internal.ServerSentEventStreamBufferSettings;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.bytebuf.internal.ByteBufBufferingSubscription;

public class ServerSentEvents
implements Renderable {
    private static final ByteBuf HEARTBEAT = Unpooled.unreleasableBuffer((ByteBuf)Unpooled.wrappedBuffer((byte[])": keepalive heartbeat\n\n".getBytes(StandardCharsets.UTF_8)));
    private final Publisher<? extends ServerSentEvent> publisher;
    private final boolean noContentOnEmpty;
    @Nullable
    private final Duration heartbeatFrequency;
    @Nullable
    private final ServerSentEventStreamBufferSettings bufferSettings;

    public static ServerSentEventsBuilder builder() {
        return new BuilderImpl();
    }

    @Deprecated
    public static <T> ServerSentEvents serverSentEvents(Publisher<T> publisher, Action<? super Event<T>> action) {
        TransformablePublisher<ServerSentEvent> eventPublisher = DefaultEvent.toEvents(publisher, action);
        return new ServerSentEvents((Publisher<? extends ServerSentEvent>)eventPublisher, false, null, null);
    }

    private ServerSentEvents(Publisher<? extends ServerSentEvent> publisher, boolean noContentOnEmpty, @Nullable Duration heartbeatFrequency, @Nullable ServerSentEventStreamBufferSettings bufferSettings) {
        this.publisher = publisher;
        this.noContentOnEmpty = noContentOnEmpty;
        this.heartbeatFrequency = heartbeatFrequency;
        this.bufferSettings = bufferSettings;
    }

    @Deprecated
    @Nullable
    public Publisher<? extends Event<?>> getPublisher() {
        return Streams.map(this.publisher, DefaultEvent::fromServerSentEvent);
    }

    @Override
    public void render(Context context) throws Exception {
        Response response = context.getResponse();
        response.getHeaders().add(HttpHeaderConstants.CACHE_CONTROL, HttpHeaderConstants.NO_CACHE_FULL);
        response.getHeaders().add(HttpHeaderConstants.PRAGMA, HttpHeaderConstants.NO_CACHE);
        if (this.noContentOnEmpty) {
            this.renderWithNoContentOnEmpty(context);
        } else {
            this.renderStream(context, this.publisher);
        }
    }

    private void renderWithNoContentOnEmpty(final Context context) {
        DefaultExecution execution = DefaultExecution.require();
        execution.delimit(context::error, continuation -> Execution.fork().eventLoop(execution.getEventLoop()).start(e -> this.publisher.subscribe((Subscriber)new Subscriber<ServerSentEvent>(){
            private Subscription subscription;
            private Subscriber subscriber;

            public void onSubscribe(Subscription s) {
                this.subscription = s;
                this.subscription.request(1L);
            }

            public void onNext(ServerSentEvent event) {
                if (this.subscriber == null) {
                    TransformablePublisher consumedPublisher = Streams.publish(Collections.singleton(event));
                    Publisher restPublisher = s -> {
                        this.subscriber = s;
                        s.onSubscribe(Objects.requireNonNull(this.subscription));
                    };
                    continuation.resume(() -> this.lambda$onNext$1(context, (Publisher)consumedPublisher, restPublisher));
                } else {
                    this.subscriber.onNext((Object)event);
                }
            }

            public void onError(Throwable t) {
                if (this.subscriber == null) {
                    continuation.resume(() -> context.error(t));
                } else {
                    this.subscriber.onError(t);
                }
            }

            public void onComplete() {
                if (this.subscriber == null) {
                    continuation.resume(() -> ServerSentEvents.emptyStream(context));
                } else {
                    this.subscriber.onComplete();
                }
            }

            private /* synthetic */ void lambda$onNext$1(Context context2, Publisher consumedPublisher, Publisher restPublisher) throws Exception {
                ServerSentEvents.this.renderStream(context2, (Publisher<? extends ServerSentEvent>)((Publisher)Streams.concat(Arrays.asList(consumedPublisher, restPublisher))));
            }
        })));
    }

    private void renderStream(Context context, Publisher<? extends ServerSentEvent> events) {
        Response response = context.getResponse();
        response.getHeaders().add(HttpHeaderConstants.CONTENT_TYPE, HttpHeaderConstants.TEXT_EVENT_STREAM_CHARSET_UTF_8);
        response.getHeaders().add(HttpHeaderConstants.TRANSFER_ENCODING, HttpHeaderConstants.CHUNKED);
        final ByteBufAllocator byteBufAllocator = context.getDirectChannelAccess().getChannel().alloc();
        EventLoop executor = context.getDirectChannelAccess().getChannel().eventLoop();
        Publisher buffers = downstream -> downstream.onSubscribe((Subscription)new ByteBufBufferingSubscription<ServerSentEvent>(events, ServerSentEvent::close, downstream, (ScheduledExecutorService)executor, System::nanoTime, this.bufferSettings == null ? Duration.ZERO : this.bufferSettings.window, this.heartbeatFrequency == null ? Duration.ZERO : this.heartbeatFrequency, HEARTBEAT){
            final int watermark;
            final int bufferSize;
            ByteBuf buffer;
            {
                super(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7);
                this.watermark = ServerSentEvents.this.bufferSettings == null ? 0 : ((ServerSentEvents)ServerSentEvents.this).bufferSettings.bytes;
                this.bufferSize = ServerSentEvents.this.bufferSettings == null ? 4096 : ((ServerSentEvents)ServerSentEvents.this).bufferSettings.bytes;
            }

            protected void buffer(ServerSentEvent item) {
                if (this.buffer == null) {
                    this.buffer = byteBufAllocator.buffer(this.bufferSize);
                }
                try {
                    ServerSentEventEncoder.encodeTo(item, this.buffer);
                }
                finally {
                    item.close();
                }
            }

            protected boolean bufferIsFull() {
                return this.buffer.readableBytes() >= this.watermark;
            }

            protected ByteBuf flush() {
                ByteBuf emittedBuffer = this.buffer;
                this.buffer = null;
                return emittedBuffer;
            }

            protected void discard() {
                ReferenceCountUtil.safeRelease((Object)this.buffer);
                this.buffer = null;
            }

            protected boolean isEmpty() {
                return this.buffer == null;
            }
        });
        response.sendStream((Publisher<? extends ByteBuf>)buffers);
    }

    private static void emptyStream(Context ctx) {
        ctx.getResponse().status(HttpResponseStatus.NO_CONTENT.code()).send();
    }

    private static class BuilderImpl
    implements ServerSentEventsBuilder {
        private boolean noContentOnEmpty;
        private ServerSentEventStreamBufferSettings bufferSettings;
        private Duration keepAliveHeartbeat;

        private BuilderImpl() {
        }

        @Override
        public ServerSentEventsBuilder buffered(int numBytes, Duration duration) {
            if (numBytes < 1) {
                System.out.println("numBytes must be > 0");
            }
            if (duration.isNegative()) {
                throw new IllegalArgumentException("duration must be zero or positive");
            }
            this.bufferSettings = new ServerSentEventStreamBufferSettings(numBytes, duration);
            return this;
        }

        @Override
        public ServerSentEventsBuilder noContentOnEmpty() {
            this.noContentOnEmpty = true;
            return this;
        }

        @Override
        public ServerSentEventsBuilder keepAlive(Duration heartbeatAfterIdleFor) {
            if (heartbeatAfterIdleFor.isNegative() || heartbeatAfterIdleFor.isZero()) {
                throw new IllegalArgumentException("duration must be positive");
            }
            this.keepAliveHeartbeat = heartbeatAfterIdleFor;
            return this;
        }

        @Override
        public ServerSentEvents build(Publisher<? extends ServerSentEvent> events) {
            return new ServerSentEvents(events, this.noContentOnEmpty, this.keepAliveHeartbeat, this.bufferSettings);
        }
    }
}

