/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.r2.transport.http.client.stream.http2;

import com.linkedin.data.ByteString;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.entitystream.ReadHandle;
import com.linkedin.r2.message.stream.entitystream.Reader;
import com.linkedin.r2.netty.common.NettyRequestAdapter;
import com.linkedin.r2.transport.common.bridge.common.RequestWithCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.http.client.AsyncPoolHandle;
import com.linkedin.r2.transport.http.client.TimeoutAsyncPoolHandle;
import com.linkedin.r2.transport.http.client.stream.OrderedEntityStreamReader;
import com.linkedin.r2.transport.http.client.stream.http2.Http2ClientPipelineInitializer;
import com.linkedin.r2.transport.http.client.stream.http2.Http2PipelinePropertyUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Http2StreamCodec
extends Http2ConnectionHandler {
    private static final Logger LOG = LoggerFactory.getLogger(Http2StreamCodec.class);
    public static final String PIPELINE_HTTP2_CODEC_HANDLER = "http2Handler";
    private static final int NO_PADDING = 0;
    private static final int NO_DATA = 0;
    private static final boolean NOT_END_STREAM = false;
    private static final boolean END_STREAM = true;

    Http2StreamCodec(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) {
        super(decoder, encoder, initialSettings);
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ChannelFuture headersFuture;
        if (!(msg instanceof RequestWithCallback)) {
            ctx.write(msg, promise);
            return;
        }
        Request request = ((RequestWithCallback)msg).request();
        Http2ConnectionEncoder encoder = this.encoder();
        int streamId = this.connection().local().incrementAndGetNextStreamId();
        if (request instanceof StreamRequest) {
            StreamRequest streamRequest = (StreamRequest)request;
            Http2Headers http2Headers = NettyRequestAdapter.toHttp2Headers(streamRequest);
            BufferedReader bufferedReader = new BufferedReader(ctx, encoder, streamId, ((RequestWithCallback)msg).handle());
            OrderedEntityStreamReader reader = new OrderedEntityStreamReader(ctx, bufferedReader);
            streamRequest.getEntityStream().setReader((Reader)reader);
            LOG.debug("Sent HTTP/2 HEADERS frame, stream={}, end={}, headers={}, padding={}bytes", new Object[]{streamId, false, http2Headers.size(), 0});
            headersFuture = encoder.writeHeaders(ctx, streamId, http2Headers, 0, false, promise);
            headersFuture.addListener(future -> {
                if (future.isSuccess()) {
                    reader.request(10);
                }
            });
        } else if (request instanceof RestRequest) {
            RestRequest restRequest = (RestRequest)request;
            Http2Headers headers = NettyRequestAdapter.toHttp2Headers(restRequest);
            LOG.debug("Sent HTTP/2 HEADERS frame, stream={}, end={}, headers={}, padding={}bytes", new Object[]{streamId, false, headers.size(), 0});
            headersFuture = encoder.writeHeaders(ctx, streamId, headers, 0, false, promise);
            headersFuture.addListener(future -> {
                if (future.isSuccess()) {
                    ByteBuf data = Unpooled.wrappedBuffer((ByteBuffer)restRequest.getEntity().asByteBuffer());
                    LOG.debug("Sent HTTP/2 DATA frame, stream={}, end={}, data={}bytes, padding={}bytes", new Object[]{streamId, true, data.readableBytes(), 0});
                    encoder.writeData(ctx, streamId, data, 0, true, ctx.newPromise());
                    ctx.channel().flush();
                }
            });
        } else {
            ((RequestWithCallback)msg).handle().release();
            throw new IllegalArgumentException("Request is neither StreamRequest or RestRequest");
        }
        TransportCallback callback = ((RequestWithCallback)msg).callback();
        TimeoutAsyncPoolHandle handle = (TimeoutAsyncPoolHandle)((RequestWithCallback)msg).handle();
        headersFuture.addListener(future -> {
            if (future.isSuccess()) {
                Http2PipelinePropertyUtil.set(ctx, this.connection(), streamId, Http2ClientPipelineInitializer.CALLBACK_ATTR_KEY, callback);
                Http2PipelinePropertyUtil.set(ctx, this.connection(), streamId, Http2ClientPipelineInitializer.CHANNEL_POOL_HANDLE_ATTR_KEY, handle);
                handle.addTimeoutTask(() -> {
                    LOG.debug("Reset stream upon timeout, stream={}", (Object)streamId);
                    this.resetStream(ctx, streamId, Http2Error.CANCEL.code(), ctx.newPromise());
                    ctx.flush();
                });
            } else {
                callback.onResponse(TransportResponseImpl.error((Throwable)future.cause()));
                handle.release();
                if (this.connection().stream(streamId) != null) {
                    LOG.debug("Reset stream upon timeout, stream={}", (Object)streamId);
                    this.resetStream(ctx, streamId, Http2Error.CANCEL.code(), ctx.newPromise());
                    ctx.flush();
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception.StreamException streamException) {
        int streamId = streamException.streamId();
        String message = String.format("HTTP/2 stream encountered an exception, stream=%d, remote=%s, channel=%s", streamId, ctx.channel().remoteAddress(), ctx.channel().id());
        LOG.error(message, cause);
        try {
            this.doOnStreamError(ctx, streamId, cause);
        }
        finally {
            super.onStreamError(ctx, outbound, cause, streamException);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception connectionError) {
        String message = String.format("HTTP/2 connection encountered an exception, streamCount=%d, remote=%s, channel=%s", this.connection().numActiveStreams(), ctx.channel().remoteAddress(), ctx.channel().id());
        LOG.error(message, cause);
        try {
            this.connection().forEachActiveStream(stream -> {
                this.resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
                this.doOnStreamError(ctx, stream.id(), cause);
                return true;
            });
            ctx.flush();
        }
        catch (Http2Exception e) {
            LOG.error("Encountered exception while invoking request callbacks with errors", (Throwable)e);
        }
        finally {
            super.onConnectionError(ctx, outbound, cause, connectionError);
        }
    }

    private void doOnStreamError(ChannelHandlerContext ctx, int streamId, Throwable cause) {
        TransportCallback callback = (TransportCallback)Http2PipelinePropertyUtil.remove(ctx, this.connection(), streamId, Http2ClientPipelineInitializer.CALLBACK_ATTR_KEY);
        if (callback != null) {
            callback.onResponse(TransportResponseImpl.error((Throwable)cause, Collections.emptyMap()));
        }
        TimeoutAsyncPoolHandle handle = (TimeoutAsyncPoolHandle)Http2PipelinePropertyUtil.remove(ctx, this.connection(), streamId, Http2ClientPipelineInitializer.CHANNEL_POOL_HANDLE_ATTR_KEY);
        Optional.ofNullable(handle).ifPresent(TimeoutAsyncPoolHandle::release);
    }

    private class BufferedReader
    implements Reader {
        private static final int MAX_BUFFERED_CHUNKS = 10;
        private static final int FLUSH_THRESHOLD = 8192;
        private final int _streamId;
        private final ChannelHandlerContext _ctx;
        private final Http2ConnectionEncoder _encoder;
        private final AsyncPoolHandle<?> _poolHandle;
        private volatile ReadHandle _readHandle;
        private int _notFlushedBytes;
        private int _notFlushedChunks;

        BufferedReader(ChannelHandlerContext ctx, Http2ConnectionEncoder encoder, int streamId, AsyncPoolHandle<?> poolHandle) {
            this._streamId = streamId;
            this._ctx = ctx;
            this._encoder = encoder;
            this._poolHandle = poolHandle;
            this._notFlushedBytes = 0;
            this._notFlushedChunks = 0;
        }

        public void onInit(ReadHandle rh) {
            this._readHandle = rh;
        }

        public void onDataAvailable(ByteString data) {
            ByteBuf content = Unpooled.wrappedBuffer((ByteBuffer)data.asByteBuffer());
            this._encoder.writeData(this._ctx, this._streamId, content, 0, false, this._ctx.channel().newPromise()).addListener(future -> this._readHandle.request(1));
            LOG.debug("Sent HTTP/2 DATA frame, stream={}, end={}, data={}bytes, padding={}bytes", new Object[]{this._streamId, false, content.readableBytes(), 0});
            this._notFlushedBytes += data.length();
            ++this._notFlushedChunks;
            if (this._notFlushedBytes >= 8192 || this._notFlushedChunks == 10) {
                this._ctx.channel().flush();
                this._notFlushedBytes = 0;
                this._notFlushedChunks = 0;
            }
        }

        public void onDone() {
            this._encoder.writeData(this._ctx, this._streamId, Unpooled.EMPTY_BUFFER, 0, true, this._ctx.channel().newPromise());
            LOG.debug("Sent HTTP/2 DATA frame, stream={}, end={}, data={}bytes, padding={}bytes", new Object[]{this._streamId, true, 0, 0});
            this._ctx.channel().flush();
        }

        public void onError(Throwable cause) {
            Http2StreamCodec.this.resetStream(this._ctx, this._streamId, Http2Error.CANCEL.code(), this._ctx.newPromise());
            this._poolHandle.release();
        }
    }
}

