/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.netty.impl.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import java.io.IOException;
import java.io.InputStream;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.service.http.netty.impl.client.ReactorNettyClient;
import org.mule.service.http.netty.impl.streaming.StatusCallback;
import org.mule.service.http.netty.impl.streaming.StreamingEntitySender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientExpectContinueHandler
extends ChannelDuplexHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClientExpectContinueHandler.class);
    private static final int CHUNK_SIZE = 8192;
    private final ExecutorService ioExecutor;
    private boolean receiving100ContinueResponse = false;
    private boolean suppressChannelReadComplete = false;

    public ClientExpectContinueHandler(ExecutorService ioExecutor) {
        this.ioExecutor = ioExecutor;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
        HttpResponse res;
        if (msg instanceof HttpResponse && (res = (HttpResponse)msg).status().code() == 100) {
            LOGGER.debug("Received 100 Continue response {}", msg);
            this.receiving100ContinueResponse = true;
            this.suppressChannelReadComplete = true;
        } else if (msg instanceof LastHttpContent && this.receiving100ContinueResponse) {
            HttpEntity requestEntity = (HttpEntity)ctx.channel().attr(ReactorNettyClient.REQUEST_ENTITY_KEY).get();
            LOGGER.debug("Request entity from the channel is {}", (Object)requestEntity);
            if (requestEntity != null) {
                this.writeEntityToChannel(ctx, requestEntity);
                LOGGER.debug("Streaming request entity payload to channel");
            } else {
                LOGGER.warn("No request entity found for 100 Continue response");
            }
            this.receiving100ContinueResponse = false;
            return;
        }
        if (this.receiving100ContinueResponse) {
            return;
        }
        ctx.fireChannelRead(msg);
    }

    private void writeEntityToChannel(ChannelHandlerContext ctx, HttpEntity requestEntity) throws IOException {
        if (requestEntity.isStreaming()) {
            this.sendStreamingEntity(ctx, requestEntity);
        } else {
            this.sendNonStreamingEntity(ctx, requestEntity);
        }
    }

    private void sendStreamingEntity(ChannelHandlerContext ctx, HttpEntity requestEntity) throws IOException {
        StreamingEntitySender entitySender = new StreamingEntitySender(requestEntity, ctx, () -> LOGGER.debug("Starting to write chunk to channel context"), new EntitySenderStatusCallback(), this.ioExecutor);
        entitySender.sendNextChunk();
    }

    private void sendNonStreamingEntity(ChannelHandlerContext ctx, HttpEntity requestEntity) {
        try (InputStream inputStream = this.validateInputStream(requestEntity);){
            this.streamEntityChunks(ctx, inputStream);
        }
        catch (IOException e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)"Failed to read content from HttpEntity:"), (Throwable)e);
        }
    }

    private InputStream validateInputStream(HttpEntity requestEntity) throws IOException {
        InputStream inputStream = requestEntity.getContent();
        if (inputStream == null) {
            throw new MuleRuntimeException((Throwable)new IOException("InputStream is null"));
        }
        return inputStream;
    }

    private void streamEntityChunks(ChannelHandlerContext ctx, InputStream inputStream) throws IOException {
        int bytesRead;
        byte[] buffer = new byte[8192];
        while ((bytesRead = inputStream.read(buffer)) != -1) {
            ByteBuf chunk = Unpooled.wrappedBuffer((byte[])buffer, (int)0, (int)bytesRead);
            ctx.write((Object)new DefaultHttpContent(chunk));
        }
        ctx.writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT);
        LOGGER.debug("Successfully streamed HttpEntity in chunks");
    }

    public void channelReadComplete(ChannelHandlerContext ctx) {
        if (this.suppressChannelReadComplete) {
            this.suppressChannelReadComplete = false;
            if (!ctx.channel().config().isAutoRead()) {
                ctx.read();
            }
        } else {
            ctx.fireChannelReadComplete();
        }
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (this.shouldAdaptRequest(ctx, msg)) {
            this.handleAdaptedRequest(ctx, (HttpRequest)msg, promise);
        } else {
            super.write(ctx, msg, promise);
        }
    }

    private boolean shouldAdaptRequest(ChannelHandlerContext ctx, Object msg) {
        return ctx.channel().hasAttr(ReactorNettyClient.REQUEST_ENTITY_KEY) && msg instanceof HttpRequest;
    }

    private void handleAdaptedRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, ChannelPromise promise) {
        HttpEntity httpEntity = (HttpEntity)ctx.channel().attr(ReactorNettyClient.REQUEST_ENTITY_KEY).get();
        if (httpEntity != null) {
            HttpRequest adaptedRequest = this.adaptRequest(httpRequest, httpEntity.getBytesLength());
            ctx.write((Object)adaptedRequest, promise);
        }
    }

    private HttpRequest adaptRequest(HttpRequest httpRequest, OptionalLong bytesLength) {
        HttpHeaders headers = httpRequest.headers();
        if (headers.contains((CharSequence)HttpHeaderNames.CONTENT_LENGTH)) {
            if (bytesLength.isPresent()) {
                LOGGER.debug("Setting the actual content-length header to the request");
                headers.set((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)bytesLength.getAsLong());
            } else {
                LOGGER.debug("Setting Transfer-Encoding=chunked header");
                headers.remove((CharSequence)HttpHeaderNames.CONTENT_LENGTH);
                headers.add((CharSequence)HttpHeaderNames.TRANSFER_ENCODING, (Object)HttpHeaderValues.CHUNKED);
            }
        }
        if (httpRequest instanceof LastHttpContent) {
            LOGGER.debug("LastHttpContent received, writing DefaultHttpRequest to the channel");
            return new DefaultHttpRequest(httpRequest.protocolVersion(), httpRequest.method(), httpRequest.uri(), headers);
        }
        return httpRequest;
    }

    private static class EntitySenderStatusCallback
    implements StatusCallback {
        private EntitySenderStatusCallback() {
        }

        @Override
        public void onFailure(Throwable exception) {
            this.logStreamingError(exception);
        }

        @Override
        public void onSuccess() {
            LOGGER.debug("Request sent successfully to server in chunks.");
        }

        private void logStreamingError(Throwable exception) {
            LOGGER.warn("Error while sending streaming request to server: {}", (Object)exception.getMessage());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Exception thrown while sending streaming request to server", exception);
            }
        }
    }
}

