/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.r2.netty.handler.common;

import com.linkedin.data.ByteString;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.stream.StreamResponseBuilder;
import com.linkedin.r2.message.stream.entitystream.EntityStreams;
import com.linkedin.r2.message.stream.entitystream.Reader;
import com.linkedin.r2.message.stream.entitystream.Writer;
import com.linkedin.r2.netty.common.NettyChannelAttributes;
import com.linkedin.r2.netty.entitystream.StreamReader;
import com.linkedin.r2.netty.entitystream.StreamWriter;
import com.linkedin.r2.transport.common.WireAttributeHelper;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.http.client.stream.OrderedEntityStreamReader;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Supplier;

@ChannelHandler.Sharable
public class ClientEntityStreamHandler
extends ChannelDuplexHandler {
    private final long _maxContentLength;

    public ClientEntityStreamHandler(long maxContentLength) {
        this._maxContentLength = maxContentLength;
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (msg instanceof StreamRequest) {
            StreamRequest request = (StreamRequest)msg;
            OrderedEntityStreamReader orderedReader = new OrderedEntityStreamReader(ctx, new StreamReader(ctx));
            ctx.write((Object)request).addListener(future -> request.getEntityStream().setReader((Reader)orderedReader));
        } else {
            ctx.write(msg);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof StreamResponseBuilder) {
            StreamResponseBuilder builder = (StreamResponseBuilder)msg;
            TreeMap headers = new TreeMap(String.CASE_INSENSITIVE_ORDER);
            headers.putAll(builder.getHeaders());
            Map wireAttrs = WireAttributeHelper.removeWireAttributes(headers);
            StreamWriter writer = new StreamWriter(ctx, this._maxContentLength);
            ctx.channel().attr(NettyChannelAttributes.RESPONSE_WRITER).set((Object)writer);
            StreamResponse response = ((StreamResponseBuilder)builder.unsafeSetHeaders(headers)).build(EntityStreams.newEntityStream((Writer)writer));
            TransportCallback callback = (TransportCallback)ctx.channel().attr(NettyChannelAttributes.RESPONSE_CALLBACK).getAndSet(null);
            if (callback != null) {
                callback.onResponse(TransportResponseImpl.success((Object)response, (Map)wireAttrs));
            }
        } else if (msg instanceof ByteString) {
            StreamWriter writer;
            StreamWriter streamWriter = writer = msg == StreamWriter.EOF ? (StreamWriter)((Object)ctx.channel().attr(NettyChannelAttributes.RESPONSE_WRITER).getAndSet(null)) : (StreamWriter)((Object)ctx.channel().attr(NettyChannelAttributes.RESPONSE_WRITER).get());
            if (writer != null) {
                writer.onDataAvailable((ByteString)msg);
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.tryInvokeCallbackWithError(ctx, ClosedChannelException::new);
        this.tryNotifyWriterWithError(ctx, ClosedChannelException::new);
        ctx.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        this.tryInvokeCallbackWithError(ctx, () -> cause);
        this.tryNotifyWriterWithError(ctx, () -> cause);
        ctx.fireExceptionCaught(cause);
    }

    private void tryInvokeCallbackWithError(ChannelHandlerContext ctx, Supplier<Throwable> causeSupplier) {
        TransportCallback callback = (TransportCallback)ctx.channel().attr(NettyChannelAttributes.RESPONSE_CALLBACK).getAndSet(null);
        if (callback != null) {
            callback.onResponse(TransportResponseImpl.error((Throwable)causeSupplier.get()));
        }
    }

    private void tryNotifyWriterWithError(ChannelHandlerContext ctx, Supplier<Throwable> causeSupplier) {
        StreamWriter writer = (StreamWriter)((Object)ctx.channel().attr(NettyChannelAttributes.RESPONSE_WRITER).getAndSet(null));
        if (writer != null) {
            writer.onError(causeSupplier.get());
        }
    }
}

