/*
 * Decompiled with CFR 0.152.
 */
package com.hotels.styx.server.netty.connectors;

import com.hotels.styx.api.Buffer;
import com.hotels.styx.api.Buffers;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.server.netty.connectors.ResponseTranslator;
import com.hotels.styx.server.netty.connectors.StyxToNettyResponseTranslator;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

class HttpResponseWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpResponseWriter.class);
    private final AtomicLong writeOps = new AtomicLong(0L);
    private final AtomicLong contentBytesWritten = new AtomicLong(0L);
    private final AtomicLong writeOpsAcked = new AtomicLong(0L);
    private final AtomicLong contentBytesAcked = new AtomicLong(0L);
    private final AtomicBoolean contentCompleted = new AtomicBoolean(false);
    private final ChannelHandlerContext ctx;
    private final ResponseTranslator responseTranslator;

    HttpResponseWriter(ChannelHandlerContext ctx) {
        this(ctx, new StyxToNettyResponseTranslator());
    }

    HttpResponseWriter(ChannelHandlerContext ctx, ResponseTranslator responseTranslator) {
        this.ctx = Objects.requireNonNull(ctx);
        this.responseTranslator = Objects.requireNonNull(responseTranslator);
    }

    public CompletableFuture<Void> write(final LiveHttpResponse response) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            this.writeHeaders(response).addListener((GenericFutureListener)((ChannelFutureListener)writeOp -> {
                if (writeOp.isSuccess()) {
                    this.writeOpsAcked.incrementAndGet();
                } else {
                    LOGGER.warn("Unable to send response headers. Written content bytes {}/{} (ackd/sent). Write events {}/{} (ackd/writes). Exception={}", new Object[]{this.contentBytesAcked.get(), this.contentBytesWritten.get(), this.writeOpsAcked.get(), this.writeOps.get(), writeOp.cause()});
                    future.completeExceptionally(writeOp.cause());
                }
            }));
            response.body().subscribe((Subscriber)new BaseSubscriber<Buffer>(){

                public void hookOnSubscribe(Subscription subscription) {
                    future.handle((ignore, cause) -> {
                        if (future.isCompletedExceptionally() && cause instanceof CancellationException) {
                            subscription.cancel();
                        }
                        return null;
                    });
                    subscription.request(1L);
                }

                public void hookOnComplete() {
                    if (!future.isDone()) {
                        HttpResponseWriter.this.nettyWriteAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener((GenericFutureListener)((ChannelFutureListener)this::onWriteEmptyLastChunkOutcome));
                        HttpResponseWriter.this.contentCompleted.set(true);
                        HttpResponseWriter.this.completeIfAllSent(future);
                    }
                }

                public void hookOnError(Throwable cause) {
                    LOGGER.warn("Content observable error. Written content bytes {}/{} (ackd/sent). Write events {}/{} (ackd/writes). Exception={}", new Object[]{HttpResponseWriter.this.contentBytesAcked.get(), HttpResponseWriter.this.contentBytesWritten.get(), HttpResponseWriter.this.writeOpsAcked.get(), HttpResponseWriter.this.writeOps.get(), cause});
                    future.completeExceptionally(cause);
                }

                public void hookOnNext(Buffer buffer) {
                    ByteBuf byteBuf = Buffers.toByteBuf((Buffer)buffer);
                    if (future.isDone()) {
                        byteBuf.release();
                    } else {
                        long bufSize = byteBuf.readableBytes();
                        HttpResponseWriter.this.contentBytesWritten.addAndGet(bufSize);
                        HttpResponseWriter.this.nettyWriteAndFlush(new DefaultHttpContent(byteBuf)).addListener(it -> this.onWriteOutcome((ChannelFuture)it, bufSize));
                    }
                }

                private void onWriteOutcome(ChannelFuture writeOp, long bufSize) {
                    if (writeOp.isSuccess()) {
                        HttpResponseWriter.this.contentBytesAcked.addAndGet(bufSize);
                        HttpResponseWriter.this.writeOpsAcked.incrementAndGet();
                        this.request(1L);
                        HttpResponseWriter.this.completeIfAllSent(future);
                    } else if (!future.isDone()) {
                        this.cancel();
                        LOGGER.warn("Write error. Written content bytes {}/{} (ackd/sent). Write events {}/{} (ackd/writes), Exception={}", new Object[]{HttpResponseWriter.this.contentBytesAcked.get(), HttpResponseWriter.this.contentBytesWritten.get(), HttpResponseWriter.this.writeOpsAcked.get(), HttpResponseWriter.this.writeOps.get(), response, writeOp.cause()});
                        future.completeExceptionally(writeOp.cause());
                    }
                }

                private void onWriteEmptyLastChunkOutcome(ChannelFuture writeOp) {
                    HttpResponseWriter.this.writeOpsAcked.incrementAndGet();
                    HttpResponseWriter.this.completeIfAllSent(future);
                    this.cancel();
                }
            });
            return future;
        }
        catch (Throwable cause) {
            LOGGER.warn("Failed to convert response headers. response={}, Cause={}", new Object[]{response, cause});
            Flux.from((Publisher)response.body().drop()).subscribe();
            future.completeExceptionally(cause);
            return future;
        }
    }

    private void completeIfAllSent(CompletableFuture<Void> future) {
        if (this.contentCompleted.get() && this.writeOps.get() == this.writeOpsAcked.get()) {
            future.complete(null);
        }
    }

    private ChannelFuture writeHeaders(LiveHttpResponse response) {
        HttpResponse nettyResponse = this.responseTranslator.toNettyResponse(response);
        if (!response.contentLength().isPresent() && !response.chunked()) {
            HttpHeaders.setTransferEncodingChunked((HttpMessage)nettyResponse);
        }
        return this.nettyWriteAndFlush(nettyResponse);
    }

    private ChannelFuture nettyWriteAndFlush(Object msg) {
        this.writeOps.incrementAndGet();
        return this.ctx.writeAndFlush(msg);
    }
}

