/*
 * Decompiled with CFR 0.152.
 */
package ratpack.sse.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.sse.internal.Clock;

public class ServerSentEventStreamKeepAlive
implements Publisher<ByteBuf> {
    private static final ByteBuf HEARTBEAT = Unpooled.unreleasableBuffer((ByteBuf)Unpooled.wrappedBuffer((byte[])": keepalive heartbeat\n\n".getBytes(StandardCharsets.UTF_8)));
    private final Publisher<? extends ByteBuf> upstream;
    private final ScheduledExecutorService executor;
    private final Duration heartBeatFrequency;
    private final Clock clock;

    public ServerSentEventStreamKeepAlive(Publisher<? extends ByteBuf> upstream, ScheduledExecutorService executor, Duration heartBeatFrequency, Clock clock) {
        this.upstream = upstream;
        this.executor = executor;
        this.heartBeatFrequency = heartBeatFrequency;
        this.clock = clock;
    }

    public void subscribe(final Subscriber<? super ByteBuf> downstream) {
        final long heartbeatFrequencyNanos = this.heartBeatFrequency.toNanos();
        this.upstream.subscribe((Subscriber)new Subscriber<ByteBuf>(){
            private Subscription subscription;
            private ScheduledFuture<?> checkFuture;
            long lastWriteAt;
            long demand;
            long demandSurplusSent;
            boolean needsHeartbeat;

            private void stop() {
                if (this.checkFuture != null) {
                    this.checkFuture.cancel(false);
                    this.checkFuture = null;
                }
            }

            public void onSubscribe(Subscription s) {
                this.subscription = s;
                downstream.onSubscribe(new Subscription(){

                    private void scheduleCheck(long inNanos) {
                        checkFuture = ServerSentEventStreamKeepAlive.this.executor.schedule(this::check, inNanos, TimeUnit.NANOSECONDS);
                    }

                    private void check() {
                        long heartbeatDue = lastWriteAt + heartbeatFrequencyNanos;
                        long nowNanos = ServerSentEventStreamKeepAlive.this.clock.nanoTime();
                        if (heartbeatDue <= nowNanos) {
                            if (demand > demandSurplusSent) {
                                this.emitHeartbeat();
                            } else {
                                needsHeartbeat = true;
                            }
                        } else {
                            this.scheduleCheck(heartbeatDue - nowNanos);
                        }
                    }

                    private void emitHeartbeat() {
                        needsHeartbeat = false;
                        ++demandSurplusSent;
                        lastWriteAt = ServerSentEventStreamKeepAlive.this.clock.nanoTime();
                        downstream.onNext((Object)HEARTBEAT.slice());
                        this.scheduleCheck(heartbeatFrequencyNanos);
                    }

                    public void request(long request) {
                        if (checkFuture == null) {
                            lastWriteAt = ServerSentEventStreamKeepAlive.this.clock.nanoTime();
                            this.scheduleCheck(heartbeatFrequencyNanos);
                        }
                        long adjustment = Math.min(request, demandSurplusSent);
                        demandSurplusSent -= adjustment;
                        long adjustedRequest = request - adjustment;
                        demand += adjustedRequest;
                        if (needsHeartbeat && demand > demandSurplusSent) {
                            this.emitHeartbeat();
                        }
                        if (adjustedRequest > 0L) {
                            subscription.request(adjustedRequest);
                        }
                    }

                    public void cancel() {
                        this.stop();
                        subscription.cancel();
                    }
                });
            }

            public void onNext(ByteBuf byteBuf) {
                --this.demand;
                this.needsHeartbeat = false;
                this.lastWriteAt = ServerSentEventStreamKeepAlive.this.clock.nanoTime();
                downstream.onNext((Object)byteBuf.touch());
            }

            public void onError(Throwable t) {
                this.stop();
                downstream.onError(t);
            }

            public void onComplete() {
                this.stop();
                downstream.onComplete();
            }
        });
    }
}

