/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.http.nio.netty.internal;

import com.typesafe.netty.HandlerSubscriber;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import java.nio.ByteBuffer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotation.ReviewBeforeRelease;

public class NettyHttpContentSubscriber
extends HandlerSubscriber<HttpContent> {
    private final Channel channel;

    public NettyHttpContentSubscriber(Channel channel) {
        super(channel.eventLoop());
        this.channel = channel;
    }

    @Override
    protected void complete() {
        EventLoop executor = this.channel.eventLoop();
        executor.execute(() -> this.channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future -> this.removeFromPipeline()));
    }

    @Override
    protected void error(Throwable error) {
        this.removeFromPipeline();
    }

    private void removeFromPipeline() {
        this.channel.pipeline().remove(this);
    }

    public Subscriber<ByteBuffer> adapt() {
        return new SubscriberAdapter(this);
    }

    private class SubscriberAdapter
    implements Subscriber<ByteBuffer> {
        private final Subscriber<HttpContent> subscriber;

        private SubscriberAdapter(Subscriber<HttpContent> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public void onSubscribe(Subscription s) {
            this.subscriber.onSubscribe(s);
        }

        @Override
        @ReviewBeforeRelease(value="Unpooled vs channel.alloc? channel.alloc seems to be preferred but we should test this more thoroughly under high concurrency.")
        public void onNext(ByteBuffer data) {
            ByteBuf buffer = NettyHttpContentSubscriber.this.channel.alloc().buffer(data.limit());
            buffer.writeBytes(data);
            DefaultHttpContent content = new DefaultHttpContent(buffer);
            this.subscriber.onNext(content);
        }

        @Override
        public void onError(Throwable t) {
            this.subscriber.onError(t);
        }

        @Override
        public void onComplete() {
            this.subscriber.onComplete();
        }
    }
}

