/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.cloud.starlight.transport.channel;

import com.baidu.cloud.starlight.api.exception.StarlightRpcException;
import com.baidu.cloud.starlight.api.exception.TransportException;
import com.baidu.cloud.starlight.api.model.MsgBase;
import com.baidu.cloud.starlight.api.model.Response;
import com.baidu.cloud.starlight.api.rpc.LocalContext;
import com.baidu.cloud.starlight.api.rpc.callback.RpcCallback;
import com.baidu.cloud.starlight.api.transport.channel.ChannelAttribute;
import com.baidu.cloud.starlight.api.transport.channel.ChannelSide;
import com.baidu.cloud.starlight.api.transport.channel.RpcChannel;
import com.baidu.cloud.starlight.api.transport.channel.RpcChannelGroup;
import com.baidu.cloud.starlight.transport.channel.NettyRpcChannelGroup;
import com.baidu.cloud.thirdparty.netty.channel.Channel;
import com.baidu.cloud.thirdparty.netty.channel.ChannelFuture;
import com.baidu.cloud.thirdparty.netty.channel.ChannelFutureListener;
import com.baidu.cloud.thirdparty.netty.handler.codec.http.HttpHeaderNames;
import com.baidu.cloud.thirdparty.netty.handler.codec.http.HttpHeaderValues;
import com.baidu.cloud.thirdparty.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LongRpcChannel
implements RpcChannel {
    protected static final Logger LOGGER = LoggerFactory.getLogger(NettyRpcChannelGroup.class);
    private final Map<Long, RpcCallback> callbacks;
    private final Map<String, Object> attributes;
    private volatile Channel channel;
    private final ChannelSide side;
    private RpcChannelGroup channelGroup;
    private AtomicBoolean inited = new AtomicBoolean(false);

    public LongRpcChannel(Channel channel, ChannelSide side) {
        this(channel, side, null);
    }

    public LongRpcChannel(Channel channel, ChannelSide side, RpcChannelGroup channelGroup) {
        this.callbacks = new ConcurrentHashMap<Long, RpcCallback>();
        this.attributes = new ConcurrentHashMap<String, Object>();
        this.channel = channel;
        this.side = side;
        this.channelGroup = channelGroup;
        this.init();
    }

    @Override
    public void init() {
        if (!this.inited.compareAndSet(false, true)) {
            return;
        }
        ChannelAttribute attribute = new ChannelAttribute(this);
        this.channel.attr(RpcChannel.ATTRIBUTE_KEY).set((Object)attribute);
    }

    @Override
    public Channel channel() {
        return this.channel;
    }

    @Override
    public ChannelSide side() {
        return this.side;
    }

    @Override
    public InetSocketAddress getRemoteAddress() {
        if (this.channel == null) {
            return null;
        }
        return (InetSocketAddress)this.channel.remoteAddress();
    }

    @Override
    public InetSocketAddress getLocalAddress() {
        if (this.channel == null) {
            return null;
        }
        return (InetSocketAddress)this.channel.localAddress();
    }

    @Override
    public boolean isActive() {
        return this.channel != null && this.channel.isActive() && this.inited.get();
    }

    @Override
    public void reconnect() throws TransportException {
        if (this.getRpcChannelGroup() != null) {
            this.getRpcChannelGroup().removeRpcChannel(this);
        }
    }

    @Override
    public void send(final MsgBase msgBase) throws TransportException {
        if (!this.isActive()) {
            throw new TransportException(TransportException.WRITE_EXCEPTION, "Channel is inactive, remoteHost: " + this.getRemoteAddress().getAddress() + ", remotePort: " + this.getRemoteAddress().getPort());
        }
        ChannelFuture sendFuture = this.channel.writeAndFlush((Object)msgBase);
        sendFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (!channelFuture.isSuccess()) {
                    RpcCallback callback;
                    String causeByMsg = channelFuture.cause() == null ? "netty exception" : channelFuture.cause().getMessage();
                    String errorMsg = String.format("Fail to send message to remote, url[%s:%s], error message: %s", LongRpcChannel.this.getRemoteAddress().getAddress(), LongRpcChannel.this.getRemoteAddress().getPort(), causeByMsg);
                    if (LongRpcChannel.this.side().equals((Object)ChannelSide.CLIENT) && (callback = LongRpcChannel.this.removeCallback(msgBase.getId())) != null) {
                        callback.onError(new TransportException(TransportException.WRITE_EXCEPTION, errorMsg));
                    }
                    LOGGER.warn(errorMsg);
                }
                if (LongRpcChannel.this.side().equals((Object)ChannelSide.SERVER) && msgBase.getProtocolName().equalsIgnoreCase("springrest") && LongRpcChannel.this.isHttpShortConnection(msgBase)) {
                    channelFuture.channel().close();
                }
            }
        });
    }

    @Override
    public void receive(MsgBase msgBase) {
    }

    @Override
    public void putCallback(long id, RpcCallback callback) {
        if (this.side().equals((Object)ChannelSide.SERVER)) {
            throw new StarlightRpcException("RpcChannel side is SERVER, not support putCallback. RequestId: " + id);
        }
        this.callbacks.putIfAbsent(id, callback);
    }

    @Override
    public RpcCallback removeCallback(long id) {
        if (this.side().equals((Object)ChannelSide.SERVER)) {
            throw new StarlightRpcException("RpcChannel side is SERVER, not support removeCallback. RequestId: " + id);
        }
        return this.callbacks.remove(id);
    }

    @Override
    public void close() {
        if (!this.inited.getAndSet(false)) {
            return;
        }
        if (this.callbacks.size() > 0) {
            LOGGER.warn("There are still unfinished requests when RpcChannel close, size {}, channelId {}, remoteAddress {}, will wait and handle this requests.", new Object[]{this.callbacks.size(), this.channel.id().asLongText(), this.channel.remoteAddress()});
            this.clearCallbacks();
        }
        LocalContext.getContext("thread.classloader").set(this.channel.id().asLongText(), null);
        if (this.channel != null) {
            LOGGER.info("The netty channel is closing, channel {}, remoteAddress {}", (Object)this.channel.id().asLongText(), (Object)this.channel.remoteAddress());
            this.channel.close();
        }
    }

    @Override
    public void setAttribute(String attributeKey, Object attributeVal) {
        this.attributes.put(attributeKey, attributeVal);
    }

    @Override
    public Object getAttribute(String attributeKey) {
        return this.attributes.get(attributeKey);
    }

    private boolean isHttpShortConnection(MsgBase msgBase) {
        if (msgBase == null) {
            return false;
        }
        if (!(msgBase instanceof Response)) {
            return false;
        }
        Response response = (Response)msgBase;
        if (response.getRequest().getAttachmentKv() == null || response.getRequest().getAttachmentKv().size() == 0) {
            return false;
        }
        Object connection = response.getRequest().getAttachmentKv().get(HttpHeaderNames.CONNECTION.toString());
        if (!(connection instanceof String)) {
            return false;
        }
        return ((String)connection).equalsIgnoreCase(HttpHeaderValues.CLOSE.toString());
    }

    public String toString() {
        return this.channel.id().asLongText();
    }

    @Override
    public RpcChannelGroup getRpcChannelGroup() {
        return this.channelGroup;
    }

    @Override
    public Map<Long, RpcCallback> allCallbacks() {
        return this.callbacks;
    }

    protected void clearCallbacks() {
        long startClearTime = System.currentTimeMillis();
        while (true) {
            if (this.callbacks.size() <= 0) {
                LOGGER.info("The channel has handled all request, will close, channelId {}, remoteAddr {}", (Object)this.channel.id().asLongText(), (Object)this.channel.remoteAddress());
                break;
            }
            if (System.currentTimeMillis() - startClearTime > 180000L) {
                LOGGER.error("The request has not been processed after waiting 3 minutes when closing channel. Unhandled request size {}, will response timeout", (Object)this.callbacks.size());
                for (RpcCallback callback : this.callbacks.values()) {
                    callback.onError(StarlightRpcException.timeoutException(callback.getRequest(), this.getRemoteAddress().getAddress().getHostAddress() + ":" + this.getRemoteAddress().getPort()));
                }
                this.callbacks.clear();
                break;
            }
            try {
                TimeUnit.SECONDS.sleep(10L);
            }
            catch (InterruptedException e) {
                LOGGER.error("Thread interrupted when clearCallbacks, remind callback size {}, will response timeout", (Object)this.callbacks.size());
            }
        }
    }
}

