/*
 * Decompiled with CFR 0.152.
 */
package org.jooby.internal.netty;

import com.google.common.util.concurrent.MoreExecutors;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import javaslang.concurrent.Promise;
import org.jooby.Sse;

public class NettySse
extends Sse {
    private ChannelHandlerContext ctx;

    public NettySse(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    protected void closeInternal() {
        this.ctx.close();
    }

    protected void handshake(Runnable handler) throws Exception {
        DefaultHttpHeaders headers = new DefaultHttpHeaders();
        headers.set((CharSequence)HttpHeaderNames.CONNECTION, (Object)"Close");
        headers.set((CharSequence)HttpHeaderNames.CONTENT_TYPE, (Object)"text/event-stream; charset=utf-8");
        this.ctx.writeAndFlush((Object)new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, (HttpHeaders)headers));
        this.ctx.executor().execute(handler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Promise<Optional<Object>> send(Optional<Object> id, byte[] data) {
        NettySse nettySse = this;
        synchronized (nettySse) {
            Promise promise = Promise.make((ExecutorService)MoreExecutors.newDirectExecutorService());
            this.ctx.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])data)).addListener((GenericFutureListener)new DoneCallback((Promise<Optional<Object>>)promise, id, arg_0 -> ((NettySse)this).ifClose(arg_0)));
            return promise;
        }
    }

    private static class DoneCallback
    implements ChannelFutureListener {
        private Promise<Optional<Object>> promise;
        private Consumer<Throwable> ifClose;
        private Optional<Object> id;

        public DoneCallback(Promise<Optional<Object>> promise, Optional<Object> id, Consumer<Throwable> ifClose) {
            this.id = id;
            this.promise = promise;
            this.ifClose = ifClose;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                this.promise.success(this.id);
            } else {
                Throwable cause = future.cause();
                this.promise.failure(cause);
                this.ifClose.accept(cause);
            }
        }
    }
}

