/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.r2.transport.http.client.common;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.MultiCallback;
import com.linkedin.common.util.None;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.Response;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.timing.TimingContextUtil;
import com.linkedin.r2.message.timing.TimingImportance;
import com.linkedin.r2.message.timing.TimingKey;
import com.linkedin.r2.netty.client.HttpNettyClient;
import com.linkedin.r2.netty.common.NettyClientState;
import com.linkedin.r2.netty.common.UnknownSchemeException;
import com.linkedin.r2.transport.common.MessageType;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponse;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.http.client.AbstractJmxManager;
import com.linkedin.r2.transport.http.client.InvokedOnceTransportCallback;
import com.linkedin.r2.transport.http.client.PoolStats;
import com.linkedin.r2.transport.http.client.PoolStatsProvider;
import com.linkedin.r2.transport.http.client.TimeoutTransportCallback;
import com.linkedin.r2.transport.http.client.common.ChannelPoolFactory;
import com.linkedin.r2.transport.http.client.common.ChannelPoolManager;
import com.linkedin.r2.transport.http.client.common.ChannelPoolManagerImpl;
import com.linkedin.r2.transport.http.common.HttpBridge;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractNettyClient<Req extends Request, Res extends Response>
implements TransportClient {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractNettyClient.class);
    private static final TimingKey TIMING_KEY = TimingKey.registerNewKey((String)"dns_resolution", (TimingImportance)TimingImportance.LOW);
    private final ChannelPoolManager _channelPoolManager;
    private final ChannelPoolManager _sslChannelPoolManager;
    protected final AtomicReference<NettyClientState> _state = new AtomicReference<NettyClientState>(NettyClientState.RUNNING);
    protected final ScheduledExecutorService _scheduler;
    private final long _requestTimeout;
    private final long _shutdownTimeout;
    private final AbstractJmxManager _jmxManager;
    private final Set<TransportCallback<Res>> _userCallbacks = ConcurrentHashMap.newKeySet();

    public AbstractNettyClient(ScheduledExecutorService executor, long requestTimeout, long shutdownTimeout, AbstractJmxManager jmxManager, ChannelPoolManager channelPoolManager, ChannelPoolManager sslChannelPoolManager) {
        this._scheduler = executor;
        this._requestTimeout = requestTimeout;
        this._shutdownTimeout = shutdownTimeout;
        this._jmxManager = jmxManager;
        this._channelPoolManager = channelPoolManager;
        this._sslChannelPoolManager = sslChannelPoolManager;
        this._jmxManager.onProviderCreate((PoolStatsProvider)this._channelPoolManager);
        this._jmxManager.onProviderCreate((PoolStatsProvider)this._sslChannelPoolManager);
    }

    public AbstractNettyClient(ChannelPoolFactory factory, ScheduledExecutorService executor, int requestTimeout, int shutdownTimeout) {
        this._scheduler = executor;
        this._requestTimeout = requestTimeout;
        this._shutdownTimeout = shutdownTimeout;
        this._jmxManager = AbstractJmxManager.NULL_JMX_MANAGER;
        DefaultChannelGroup allChannels = new DefaultChannelGroup("R2 client channels", (EventExecutor)GlobalEventExecutor.INSTANCE);
        this._sslChannelPoolManager = this._channelPoolManager = new ChannelPoolManagerImpl(factory, (ChannelGroup)allChannels, this._scheduler);
        this._jmxManager.onProviderCreate((PoolStatsProvider)this._channelPoolManager);
    }

    protected abstract TransportCallback<Res> getExecutionCallback(TransportCallback<Res> var1);

    protected abstract void doWriteRequest(Req var1, RequestContext var2, SocketAddress var3, Map<String, String> var4, TimeoutTransportCallback<Res> var5, long var6);

    public void restRequest(RestRequest request, RequestContext requestContext, Map<String, String> wireAttrs, TransportCallback<RestResponse> callback) {
        MessageType.setMessageType((MessageType.Type)MessageType.Type.REST, wireAttrs);
        this.writeRequest(request, requestContext, wireAttrs, HttpBridge.restToHttpCallback(callback, (RestRequest)request));
    }

    public void streamRequest(StreamRequest request, RequestContext requestContext, Map<String, String> wireAttrs, TransportCallback<StreamResponse> callback) {
        MessageType.setMessageType((MessageType.Type)MessageType.Type.REST, wireAttrs);
        this.writeRequest(request, requestContext, wireAttrs, HttpBridge.streamToHttpCallback(callback, (Request)request));
    }

    private TransportCallback<Res> getShutdownAwareCallback(TransportCallback<Res> callback) {
        InvokedOnceTransportCallback onceTransportCallback = new InvokedOnceTransportCallback(callback);
        this._userCallbacks.add((TransportCallback<Res>)onceTransportCallback);
        return arg_0 -> this.lambda$getShutdownAwareCallback$0((TransportCallback)onceTransportCallback, arg_0);
    }

    private void writeRequest(Req request, RequestContext requestContext, Map<String, String> wireAttrs, TransportCallback<Res> callback) {
        SocketAddress address;
        TransportCallback<Res> executionCallback = this.getExecutionCallback(callback);
        TransportCallback<Res> shutdownAwareCallback = this.getShutdownAwareCallback(executionCallback);
        long requestTimeout = HttpNettyClient.resolveRequestTimeout(requestContext, this._requestTimeout);
        TimeoutTransportCallback timeoutCallback = new TimeoutTransportCallback(this._scheduler, requestTimeout, TimeUnit.MILLISECONDS, shutdownAwareCallback, "Exceeded request timeout of " + requestTimeout + "ms");
        NettyClientState state = this._state.get();
        if (state != NettyClientState.RUNNING) {
            AbstractNettyClient.errorResponse(callback, new IllegalStateException("Client is " + (Object)((Object)state)));
            return;
        }
        try {
            TimingContextUtil.markTiming((RequestContext)requestContext, (TimingKey)TIMING_KEY);
            address = HttpNettyClient.resolveAddress(request, requestContext);
            TimingContextUtil.markTiming((RequestContext)requestContext, (TimingKey)TIMING_KEY);
        }
        catch (UnknownSchemeException | UnknownHostException e) {
            AbstractNettyClient.errorResponse(callback, e);
            return;
        }
        this.doWriteRequest(request, requestContext, address, wireAttrs, timeoutCallback, requestTimeout);
    }

    private static boolean isSslRequest(Request request) {
        return "https".equals(request.getURI().getScheme());
    }

    protected ChannelPoolManager getChannelPoolManagerPerRequest(Request request) {
        return AbstractNettyClient.isSslRequest(request) ? this._sslChannelPoolManager : this._channelPoolManager;
    }

    public final void shutdown(final Callback<None> callback) {
        LOG.info("Shutdown requested");
        if (this._state.compareAndSet(NettyClientState.RUNNING, NettyClientState.SHUTTING_DOWN)) {
            LOG.info("Shutting down");
            MultiCallback poolShutdown = new MultiCallback((Callback)new Callback<None>(){

                private void releaseCallbacks() {
                    AbstractNettyClient.this._userCallbacks.forEach(transportCallback -> transportCallback.onResponse(TransportResponseImpl.error((Throwable)new TimeoutException("Operation did not complete before shutdown"))));
                }

                public void onError(Throwable e) {
                    this.releaseCallbacks();
                    callback.onError(e);
                }

                public void onSuccess(None result) {
                    this.releaseCallbacks();
                    callback.onSuccess((Object)result);
                }
            }, 2);
            this._channelPoolManager.shutdown((Callback<None>)poolShutdown, () -> this._state.set(NettyClientState.REQUESTS_STOPPING), () -> this._state.set(NettyClientState.SHUTDOWN), this._shutdownTimeout);
            this._sslChannelPoolManager.shutdown((Callback<None>)poolShutdown, () -> this._state.set(NettyClientState.REQUESTS_STOPPING), () -> this._state.set(NettyClientState.SHUTDOWN), this._shutdownTimeout);
            this._jmxManager.onProviderShutdown((PoolStatsProvider)this._channelPoolManager);
            this._jmxManager.onProviderShutdown((PoolStatsProvider)this._sslChannelPoolManager);
            TimingKey.unregisterKey((TimingKey)TIMING_KEY);
        } else {
            callback.onError((Throwable)new IllegalStateException("Shutdown has already been requested."));
        }
    }

    public static <T> void errorResponse(TransportCallback<T> callback, Throwable e) {
        callback.onResponse(TransportResponseImpl.error((Throwable)e));
    }

    public static Exception toException(Throwable t) {
        if (t instanceof Exception) {
            return (Exception)t;
        }
        return new Exception("Wrapped Throwable", t);
    }

    public final Map<String, PoolStats> getPoolStats() {
        return this._channelPoolManager.getPoolStats();
    }

    private /* synthetic */ void lambda$getShutdownAwareCallback$0(TransportCallback onceTransportCallback, TransportResponse response) {
        this._userCallbacks.remove(onceTransportCallback);
        onceTransportCallback.onResponse(response);
    }
}

