/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.r2.netty.entitystream;

import com.linkedin.data.ByteString;
import com.linkedin.r2.message.stream.entitystream.ReadHandle;
import com.linkedin.r2.message.stream.entitystream.Reader;
import com.linkedin.r2.netty.common.NettyChannelAttributes;
import com.linkedin.r2.netty.common.StreamingTimeout;
import io.netty.channel.ChannelHandlerContext;

public class StreamReader
implements Reader {
    public static final ByteString EOF = ByteString.copy((byte[])new byte[0]);
    private static final int REQUEST_CHUNKS = 1;
    private static final int MAX_BUFFERED_CHUNKS = 8;
    private static final int FLUSH_THRESHOLD = 8192;
    private final ChannelHandlerContext _ctx;
    private int _notFlushedBytes;
    private int _notFlushedChunks;
    private volatile ReadHandle _rh;

    public StreamReader(ChannelHandlerContext ctx) {
        this._ctx = ctx;
    }

    public void onInit(ReadHandle rh) {
        this._rh = rh;
        this.refreshStreamLastActiveTime();
        this._rh.request(8);
    }

    public void onDataAvailable(ByteString data) {
        this.refreshStreamLastActiveTime();
        this._ctx.write((Object)data).addListener(future -> this._rh.request(1));
        this._notFlushedBytes += data.length();
        ++this._notFlushedChunks;
        if (this._notFlushedBytes >= 8192 || this._notFlushedChunks == 8) {
            this._ctx.flush();
            this._notFlushedBytes = 0;
            this._notFlushedChunks = 0;
        }
    }

    public void onDone() {
        this._ctx.writeAndFlush((Object)EOF);
    }

    public void onError(Throwable e) {
        this._ctx.fireExceptionCaught(e);
    }

    private void refreshStreamLastActiveTime() {
        StreamingTimeout idleTimeout = (StreamingTimeout)this._ctx.channel().attr(NettyChannelAttributes.STREAMING_TIMEOUT_FUTURE).get();
        if (idleTimeout != null) {
            idleTimeout.refreshLastActiveTime();
        }
    }
}

