/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.r2.netty.client;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.MultiCallback;
import com.linkedin.common.util.None;
import com.linkedin.r2.message.Messages;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestRequestBuilder;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamRequestBuilder;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.timing.TimingContextUtil;
import com.linkedin.r2.message.timing.TimingImportance;
import com.linkedin.r2.message.timing.TimingKey;
import com.linkedin.r2.netty.callback.StreamExecutionCallback;
import com.linkedin.r2.netty.common.NettyChannelAttributes;
import com.linkedin.r2.netty.common.NettyClientState;
import com.linkedin.r2.netty.common.ShutdownTimeoutException;
import com.linkedin.r2.netty.common.StreamingTimeout;
import com.linkedin.r2.netty.common.UnknownSchemeException;
import com.linkedin.r2.netty.handler.common.SslHandshakeTimingHandler;
import com.linkedin.r2.transport.common.MessageType;
import com.linkedin.r2.transport.common.WireAttributeHelper;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponse;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.http.client.AsyncPool;
import com.linkedin.r2.transport.http.client.InvokedOnceTransportCallback;
import com.linkedin.r2.transport.http.client.common.ChannelPoolManager;
import com.linkedin.r2.transport.http.client.common.ssl.SslSessionValidator;
import com.linkedin.r2.transport.http.common.HttpBridge;
import com.linkedin.r2.transport.http.common.HttpProtocolVersion;
import com.linkedin.r2.util.Cancellable;
import com.linkedin.r2.util.RequestTimeoutUtil;
import com.linkedin.r2.util.Timeout;
import com.linkedin.util.ArgumentUtil;
import com.linkedin.util.clock.Clock;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpNettyClient
implements TransportClient {
    private static final Logger LOG = LoggerFactory.getLogger(HttpNettyClient.class);
    private static final TimingKey TIMING_KEY = TimingKey.registerNewKey((String)"dns_resolution_new", (TimingImportance)TimingImportance.LOW);
    private static final String HTTP_SCHEME = HttpScheme.HTTP.toString();
    private static final String HTTPS_SCHEME = HttpScheme.HTTPS.toString();
    private static final int HTTP_DEFAULT_PORT = 80;
    private static final int HTTPS_DEFAULT_PORT = 443;
    private static final int DEFAULT_STREAMING_TIMEOUT = -1;
    private final EventLoopGroup _eventLoopGroup;
    private final ScheduledExecutorService _scheduler;
    private final ExecutorService _callbackExecutor;
    private final ChannelPoolManager _channelPoolManager;
    private final ChannelPoolManager _sslChannelPoolManager;
    private final Clock _clock;
    private final HttpProtocolVersion _protocolVersion;
    private final long _requestTimeout;
    private final long _streamingTimeout;
    private final long _shutdownTimeout;
    private final AtomicReference<NettyClientState> _state;
    private final Set<TransportCallback<StreamResponse>> _userCallbacks = ConcurrentHashMap.newKeySet();

    public HttpNettyClient(EventLoopGroup eventLoopGroup, ScheduledExecutorService scheduler, ExecutorService callbackExecutor, ChannelPoolManager channelPoolManager, ChannelPoolManager sslChannelPoolManager, HttpProtocolVersion protocolVersion, Clock clock, long requestTimeout, long streamingTimeout, long shutdownTimeout) {
        ArgumentUtil.notNull((Object)eventLoopGroup, (String)"eventLoopGroup");
        ArgumentUtil.notNull((Object)scheduler, (String)"scheduler");
        ArgumentUtil.notNull((Object)callbackExecutor, (String)"callbackExecutor");
        ArgumentUtil.notNull((Object)channelPoolManager, (String)"channelPoolManager");
        ArgumentUtil.notNull((Object)sslChannelPoolManager, (String)"sslChannelPoolManager");
        ArgumentUtil.notNull((Object)clock, (String)"clock");
        ArgumentUtil.checkArgument((requestTimeout >= 0L ? 1 : 0) != 0, (String)"requestTimeout");
        ArgumentUtil.checkArgument((streamingTimeout >= -1L ? 1 : 0) != 0, (String)"streamingTimeout");
        ArgumentUtil.checkArgument((shutdownTimeout >= 0L ? 1 : 0) != 0, (String)"shutdownTimeout");
        if (streamingTimeout >= requestTimeout) {
            streamingTimeout = -1L;
        }
        this._eventLoopGroup = eventLoopGroup;
        this._scheduler = scheduler;
        this._callbackExecutor = callbackExecutor;
        this._channelPoolManager = channelPoolManager;
        this._sslChannelPoolManager = sslChannelPoolManager;
        this._clock = clock;
        this._protocolVersion = protocolVersion;
        this._requestTimeout = requestTimeout;
        this._streamingTimeout = streamingTimeout;
        this._shutdownTimeout = shutdownTimeout;
        this._state = new AtomicReference<NettyClientState>(NettyClientState.RUNNING);
    }

    public void restRequest(RestRequest request, RequestContext requestContext, Map<String, String> wireAttrs, TransportCallback<RestResponse> callback) {
        this.sendRequest((Request)request, requestContext, wireAttrs, (TransportCallback<StreamResponse>)Messages.toStreamTransportCallback(callback));
    }

    public void streamRequest(StreamRequest request, RequestContext requestContext, Map<String, String> wireAttrs, TransportCallback<StreamResponse> callback) {
        if (HttpNettyClient.isFullRequest(requestContext)) {
            this.sendStreamRequestAsRestRequest(request, requestContext, wireAttrs, callback);
        } else {
            this.sendRequest((Request)request, requestContext, wireAttrs, callback);
        }
    }

    public void shutdown(final Callback<None> callback) {
        LOG.info("Shutdown requested");
        if (this._state.compareAndSet(NettyClientState.RUNNING, NettyClientState.SHUTTING_DOWN)) {
            LOG.info("Shutting down");
            MultiCallback poolShutdown = new MultiCallback((Callback)new Callback<None>(){

                private void releaseCallbacks() {
                    HttpNettyClient.this._userCallbacks.forEach(transportCallback -> transportCallback.onResponse(TransportResponseImpl.error((Throwable)new TimeoutException("Operation did not complete before shutdown"))));
                }

                public void onError(Throwable e) {
                    this.releaseCallbacks();
                    callback.onError(e);
                }

                public void onSuccess(None result) {
                    this.releaseCallbacks();
                    callback.onSuccess((Object)result);
                }
            }, 2);
            this._channelPoolManager.shutdown((Callback<None>)poolShutdown, () -> this._state.set(NettyClientState.REQUESTS_STOPPING), () -> this._state.set(NettyClientState.SHUTDOWN), this._shutdownTimeout);
            this._sslChannelPoolManager.shutdown((Callback<None>)poolShutdown, () -> this._state.set(NettyClientState.REQUESTS_STOPPING), () -> this._state.set(NettyClientState.SHUTDOWN), this._shutdownTimeout);
        } else {
            callback.onError((Throwable)new IllegalStateException("Shutdown has already been requested."));
        }
        TimingKey.unregisterKey((TimingKey)TIMING_KEY);
    }

    private void sendStreamRequestAsRestRequest(StreamRequest request, final RequestContext requestContext, final Map<String, String> wireAttrs, final TransportCallback<StreamResponse> callback) {
        Messages.toRestRequest((StreamRequest)request, (Callback)new Callback<RestRequest>(){

            public void onError(Throwable e) {
                callback.onResponse(TransportResponseImpl.error((Throwable)e));
            }

            public void onSuccess(RestRequest restRequest) {
                HttpNettyClient.this.sendRequest((Request)restRequest, requestContext, wireAttrs, (TransportCallback<StreamResponse>)callback);
            }
        });
    }

    private static boolean isFullRequest(RequestContext requestContext) {
        Object isFullRequest = requestContext.getLocalAttr("IS_FULL_REQUEST");
        return isFullRequest != null && (Boolean)isFullRequest != false;
    }

    private void sendRequest(Request request, RequestContext requestContext, Map<String, String> wireAttrs, TransportCallback<StreamResponse> callback) {
        AsyncPool<Channel> pool;
        StreamRequest requestWithWireAttrHeaders;
        SocketAddress address;
        TransportCallback<StreamResponse> decoratedCallback = this.decorateUserCallback(request, callback);
        NettyClientState state = this._state.get();
        if (state != NettyClientState.RUNNING) {
            decoratedCallback.onResponse(TransportResponseImpl.error((Throwable)new IllegalStateException("Client is not running")));
            return;
        }
        long resolvedRequestTimeout = HttpNettyClient.resolveRequestTimeout(requestContext, this._requestTimeout);
        Timeout timeout = new Timeout(this._scheduler, resolvedRequestTimeout, TimeUnit.MILLISECONDS, (Object)None.none());
        timeout.addTimeoutTask(() -> decoratedCallback.onResponse(TransportResponseImpl.error((Throwable)new TimeoutException("Exceeded request timeout of " + resolvedRequestTimeout + "ms"))));
        try {
            TimingContextUtil.markTiming((RequestContext)requestContext, (TimingKey)TIMING_KEY);
            address = HttpNettyClient.resolveAddress(request, requestContext);
            TimingContextUtil.markTiming((RequestContext)requestContext, (TimingKey)TIMING_KEY);
        }
        catch (Exception e) {
            decoratedCallback.onResponse(TransportResponseImpl.error((Throwable)e));
            return;
        }
        if (request instanceof StreamRequest) {
            requestWithWireAttrHeaders = this.buildRequestWithWireAttributes((StreamRequest)request, wireAttrs);
        } else {
            MessageType.setMessageType((MessageType.Type)MessageType.Type.REST, wireAttrs);
            requestWithWireAttrHeaders = this.buildRequestWithWireAttributes((RestRequest)request, wireAttrs);
        }
        try {
            pool = this.getChannelPoolManagerPerRequest((Request)requestWithWireAttrHeaders).getPoolForAddress(address);
        }
        catch (IllegalStateException e) {
            decoratedCallback.onResponse(TransportResponseImpl.error((Throwable)e));
            return;
        }
        requestContext.putLocalAttr("HTTP_PROTOCOL_VERSION", (Object)this._protocolVersion);
        Cancellable pendingGet = pool.get((Callback)new ChannelPoolGetCallback(pool, (Request)requestWithWireAttrHeaders, requestContext, decoratedCallback, (Timeout<None>)timeout, resolvedRequestTimeout, this._streamingTimeout));
        if (pendingGet != null) {
            timeout.addTimeoutTask(() -> ((Cancellable)pendingGet).cancel());
        }
    }

    private StreamRequest buildRequestWithWireAttributes(StreamRequest request, Map<String, String> wireAttrs) {
        return ((StreamRequestBuilder)request.builder().overwriteHeaders(WireAttributeHelper.toWireAttributes(wireAttrs))).build(request.getEntityStream());
    }

    private RestRequest buildRequestWithWireAttributes(RestRequest request, Map<String, String> wireAttrs) {
        return ((RestRequestBuilder)new RestRequestBuilder(request).overwriteHeaders(WireAttributeHelper.toWireAttributes(wireAttrs))).build();
    }

    private TransportCallback<StreamResponse> decorateUserCallback(Request request, TransportCallback<StreamResponse> callback) {
        TransportCallback httpCallback = HttpBridge.streamToHttpCallback(callback, (Request)request);
        TransportCallback<StreamResponse> executionCallback = this.getExecutionCallback((TransportCallback<StreamResponse>)httpCallback);
        TransportCallback<StreamResponse> shutdownAwareCallback = this.getShutdownAwareCallback(executionCallback);
        return shutdownAwareCallback;
    }

    private TransportCallback<StreamResponse> getExecutionCallback(TransportCallback<StreamResponse> callback) {
        return new StreamExecutionCallback(this._callbackExecutor, callback);
    }

    private TransportCallback<StreamResponse> getShutdownAwareCallback(TransportCallback<StreamResponse> callback) {
        InvokedOnceTransportCallback onceTransportCallback = new InvokedOnceTransportCallback(callback);
        this._userCallbacks.add((TransportCallback<StreamResponse>)onceTransportCallback);
        return arg_0 -> this.lambda$getShutdownAwareCallback$5((TransportCallback)onceTransportCallback, arg_0);
    }

    private ChannelPoolManager getChannelPoolManagerPerRequest(Request request) {
        return HttpNettyClient.isSslRequest(request) ? this._sslChannelPoolManager : this._channelPoolManager;
    }

    private static boolean isSslRequest(Request request) {
        return HTTPS_SCHEME.equals(request.getURI().getScheme());
    }

    public static long resolveRequestTimeout(RequestContext context, long requestTimeout) {
        Double preemptiveTimeoutRate;
        long resolvedRequestTimeout = requestTimeout;
        Number requestTimeoutRaw = (Number)context.getLocalAttr("REQUEST_TIMEOUT");
        if (requestTimeoutRaw != null) {
            resolvedRequestTimeout = requestTimeoutRaw.longValue();
        }
        if ((preemptiveTimeoutRate = (Double)context.getLocalAttr("PREEMPTIVE_TIMEOUT_RATE")) != null) {
            resolvedRequestTimeout = RequestTimeoutUtil.applyPreemptiveTimeoutRate((long)resolvedRequestTimeout, (double)preemptiveTimeoutRate);
        }
        return resolvedRequestTimeout;
    }

    public static SocketAddress resolveAddress(Request request, RequestContext requestContext) throws UnknownHostException, UnknownSchemeException {
        URI uri = request.getURI();
        String scheme = uri.getScheme();
        if (!HTTP_SCHEME.equalsIgnoreCase(scheme) && !HTTPS_SCHEME.equalsIgnoreCase(scheme)) {
            throw new UnknownSchemeException("Unknown scheme: " + scheme + " (only http/https is supported)");
        }
        String host = uri.getHost();
        int port = uri.getPort();
        if (port == -1) {
            port = HTTP_SCHEME.equalsIgnoreCase(scheme) ? 80 : 443;
        }
        InetAddress inetAddress = InetAddress.getByName(host);
        InetSocketAddress address = new InetSocketAddress(inetAddress, port);
        requestContext.putLocalAttr("REMOTE_SERVER_ADDR", (Object)inetAddress.getHostAddress());
        requestContext.putLocalAttr("REMOTE_SERVER_PORT", (Object)port);
        return address;
    }

    private /* synthetic */ void lambda$getShutdownAwareCallback$5(TransportCallback onceTransportCallback, TransportResponse response) {
        this._userCallbacks.remove(onceTransportCallback);
        onceTransportCallback.onResponse(response);
    }

    private class ChannelPoolGetCallback
    implements Callback<Channel> {
        private final AsyncPool<Channel> _pool;
        private final Request _request;
        private final RequestContext _requestContext;
        private final TransportCallback<StreamResponse> _callback;
        private final Timeout<None> _timeout;
        private final long _resolvedRequestTimeout;
        private final long _streamingTimeout;

        ChannelPoolGetCallback(AsyncPool<Channel> pool, Request request, RequestContext requestContext, TransportCallback<StreamResponse> callback, Timeout<None> timeout, long resolvedRequestTimeout, long streamingTimeout) {
            this._pool = pool;
            this._request = request;
            this._requestContext = requestContext;
            this._callback = callback;
            this._timeout = timeout;
            this._resolvedRequestTimeout = resolvedRequestTimeout;
            this._streamingTimeout = streamingTimeout;
        }

        public void onSuccess(Channel channel) {
            this._timeout.getItem();
            channel.attr(NettyChannelAttributes.CHANNEL_POOL).set(this._pool);
            TransportCallback<StreamResponse> sslTimingCallback = SslHandshakeTimingHandler.getSslTimingCallback(channel, this._requestContext, this._callback);
            channel.attr(NettyChannelAttributes.RESPONSE_CALLBACK).set(sslTimingCallback);
            SslSessionValidator sslSessionValidator = (SslSessionValidator)this._requestContext.getLocalAttr("REQUESTED_SSL_SESSION_VALIDATOR");
            channel.attr(NettyChannelAttributes.SSL_SESSION_VALIDATOR).set((Object)sslSessionValidator);
            NettyClientState state = (NettyClientState)((Object)HttpNettyClient.this._state.get());
            if (state == NettyClientState.REQUESTS_STOPPING || state == NettyClientState.SHUTDOWN) {
                channel.pipeline().fireExceptionCaught((Throwable)new ShutdownTimeoutException("Operation did not complete before shutdown"));
                return;
            }
            ScheduledFuture<ChannelPipeline> timeoutFuture = HttpNettyClient.this._scheduler.schedule(() -> channel.pipeline().fireExceptionCaught((Throwable)new TimeoutException("Exceeded request timeout of " + this._resolvedRequestTimeout + "ms")), this._resolvedRequestTimeout, TimeUnit.MILLISECONDS);
            if (this.isStreamingTimeoutEnabled()) {
                StreamingTimeout streamingTimeout = new StreamingTimeout(HttpNettyClient.this._scheduler, this._streamingTimeout, channel, HttpNettyClient.this._clock);
                channel.attr(NettyChannelAttributes.STREAMING_TIMEOUT_FUTURE).set((Object)streamingTimeout);
            }
            channel.attr(NettyChannelAttributes.TIMEOUT_FUTURE).set(timeoutFuture);
            channel.writeAndFlush((Object)this._request).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        }

        private boolean isStreamingTimeoutEnabled() {
            return this._streamingTimeout > -1L;
        }

        public void onError(Throwable e) {
            this._callback.onResponse(TransportResponseImpl.error((Throwable)e));
        }
    }
}

