/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.impl.service.client.async;

import com.ning.http.client.AsyncHandler;
import com.ning.http.client.HttpResponseBodyPart;
import com.ning.http.client.HttpResponseHeaders;
import com.ning.http.client.HttpResponseStatus;
import com.ning.http.client.Response;
import com.ning.http.client.providers.grizzly.GrizzlyResponseHeaders;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.glassfish.grizzly.http.HttpResponsePacket;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.service.http.impl.service.client.HttpResponseCreator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResponseBodyDeferringAsyncHandler
implements AsyncHandler<Response> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ResponseBodyDeferringAsyncHandler.class);
    private static Field responseField;
    private volatile Response response;
    private int bufferSize;
    private PipedOutputStream output;
    private Optional<InputStream> input = Optional.empty();
    private final CompletableFuture<HttpResponse> future;
    private final Response.ResponseBuilder responseBuilder = new Response.ResponseBuilder();
    private final HttpResponseCreator httpResponseCreator = new HttpResponseCreator();
    private final AtomicBoolean handled = new AtomicBoolean(false);

    public ResponseBodyDeferringAsyncHandler(CompletableFuture<HttpResponse> future, int userDefinedBufferSize) throws IOException {
        this.future = future;
        this.bufferSize = userDefinedBufferSize;
    }

    public void onThrowable(Throwable t) {
        try {
            this.closeOut();
        }
        catch (IOException e) {
            LOGGER.debug("Error closing HTTP response stream", (Throwable)e);
        }
        if (!this.handled.getAndSet(true)) {
            Exception exception = t instanceof TimeoutException ? (TimeoutException)t : (t instanceof IOException ? (IOException)t : new IOException(t.getMessage(), t));
            this.future.completeExceptionally(exception);
        } else {
            if (t.getMessage() != null && t.getMessage().contains("Pipe closed")) {
                LOGGER.warn("HTTP response stream was closed before being read but response streams must always be consumed.");
            } else {
                LOGGER.warn("Error handling HTTP response stream. Set log level to DEBUG for details.");
            }
            LOGGER.debug("HTTP response stream error was ", t);
        }
    }

    public AsyncHandler.STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
        this.responseBuilder.reset();
        this.responseBuilder.accumulate(responseStatus);
        return AsyncHandler.STATE.CONTINUE;
    }

    public AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
        this.responseBuilder.accumulate(headers);
        if (this.bufferSize < 0) {
            LOGGER.debug("onHeadersReceived. No configured buffer size, resolving buffer size dynamically.");
            this.calculateBufferSize(headers);
        } else {
            LOGGER.debug("onHeadersReceived. Using user configured buffer size of '{} bytes'.", (Object)this.bufferSize);
        }
        return AsyncHandler.STATE.CONTINUE;
    }

    private void calculateBufferSize(HttpResponseHeaders headers) {
        int maxBufferSize = TCPNIOTransport.MAX_RECEIVE_BUFFER_SIZE;
        String contentLength = headers.getHeaders().getFirstValue("Content-Length");
        if (!StringUtils.isEmpty((CharSequence)contentLength) && StringUtils.isEmpty((CharSequence)headers.getHeaders().getFirstValue("Transfer-Encoding"))) {
            int contentLengthInt = Integer.valueOf(contentLength);
            try {
                if (responseField != null && headers instanceof GrizzlyResponseHeaders) {
                    maxBufferSize = ((HttpResponsePacket)responseField.get(headers)).getRequest().getConnection().getReadBufferSize();
                }
            }
            catch (IllegalAccessException e) {
                LOGGER.debug("Unable to access connection buffer size.");
            }
            this.bufferSize = Math.min(maxBufferSize, contentLengthInt);
        } else {
            this.bufferSize = DataUnit.KB.toBytes(32) + 10;
        }
        LOGGER.debug("Max buffer size = {} bytes, Connection buffer size = {} bytes, Content-length = {} bytes, Calculated buffer size = {} bytes", new Object[]{TCPNIOTransport.MAX_RECEIVE_BUFFER_SIZE, maxBufferSize, contentLength, this.bufferSize});
    }

    public AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
        if (!this.input.isPresent()) {
            if (bodyPart.isLast()) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Single part (size = {}bytes).", (Object)bodyPart.getBodyByteBuffer().remaining());
                }
                this.responseBuilder.accumulate(bodyPart);
                this.handleIfNecessary();
                return AsyncHandler.STATE.CONTINUE;
            }
            this.output = new PipedOutputStream();
            this.input = Optional.of(new PipedInputStream(this.output, this.bufferSize));
        }
        if (LOGGER.isDebugEnabled()) {
            int bodyLength = bodyPart.getBodyByteBuffer().remaining();
            LOGGER.debug("Multiple parts (part size = {} bytes, PipedInputStream buffer size = {} bytes).", (Object)bodyLength, (Object)this.bufferSize);
            if (this.bufferSize - this.input.get().available() < bodyLength) {
                LOGGER.debug("SELECTOR BLOCKED! No room in piped stream to write {} bytes immediately. There are still has {} bytes unread", (Object)LOGGER, (Object)this.input.get().available());
            }
        }
        this.handleIfNecessary();
        try {
            bodyPart.writeTo((OutputStream)this.output);
        }
        catch (IOException e) {
            this.onThrowable(e);
            return AsyncHandler.STATE.ABORT;
        }
        return AsyncHandler.STATE.CONTINUE;
    }

    protected void closeOut() throws IOException {
        if (this.output != null) {
            try {
                this.output.flush();
            }
            finally {
                this.output.close();
            }
        }
    }

    public Response onCompleted() throws IOException {
        this.handleIfNecessary();
        this.closeOut();
        return null;
    }

    private void handleIfNecessary() {
        if (!this.handled.getAndSet(true)) {
            this.response = this.responseBuilder.build();
            try {
                this.future.complete(this.httpResponseCreator.create(this.response, this.input.orElse(this.response.getResponseBodyAsStream())));
            }
            catch (IOException e) {
                this.future.completeExceptionally(e);
            }
        }
    }

    static {
        try {
            responseField = GrizzlyResponseHeaders.class.getDeclaredField("response");
            responseField.setAccessible(true);
        }
        catch (Throwable e) {
            LOGGER.warn("Unable to use reflection to access connection buffer size to optimize streaming.", e);
        }
    }
}

