/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.stram.FSRecoveryHandler;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.google.common.base.Throwables;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.List;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecoverableRpcProxy
implements InvocationHandler,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(RecoverableRpcProxy.class);
    public static final String RPC_TIMEOUT = "com.datatorrent.stram.rpc.timeout";
    public static final String RETRY_TIMEOUT = "com.datatorrent.stram.rpc.retry.timeout";
    public static final String RETRY_DELAY = "com.datatorrent.stram.rpc.delay.timeout";
    public static final String QP_retryTimeoutMillis = "retryTimeoutMillis";
    public static final String QP_retryDelayMillis = "retryDelayMillis";
    public static final String QP_rpcTimeout = "rpcTimeout";
    private static final int RETRY_TIMEOUT_DEFAULT = 30000;
    private static final int RETRY_DELAY_DEFAULT = 10000;
    private static final int RPC_TIMEOUT_DEFAULT = 5000;
    private final Configuration conf;
    private final String appPath;
    private StreamingContainerUmbilicalProtocol umbilical;
    private String lastConnectURI;
    private long lastCompletedCallTms;
    private long retryTimeoutMillis = Long.getLong("com.datatorrent.stram.rpc.retry.timeout", 30000L);
    private long retryDelayMillis = Long.getLong("com.datatorrent.stram.rpc.delay.timeout", 10000L);
    private int rpcTimeout = Integer.getInteger("com.datatorrent.stram.rpc.timeout", 5000);

    public RecoverableRpcProxy(String appPath, Configuration conf) throws IOException {
        this.conf = conf;
        this.appPath = appPath;
        this.connect();
    }

    private void connect() throws IOException {
        FSRecoveryHandler fsrh = new FSRecoveryHandler(this.appPath, this.conf);
        String uriStr = fsrh.readConnectUri();
        if (!uriStr.equals(this.lastConnectURI)) {
            LOG.debug("Got new RPC connect address {}", (Object)uriStr);
            this.lastCompletedCallTms = System.currentTimeMillis();
            this.lastConnectURI = uriStr;
        }
        URI heartbeatUri = URI.create(uriStr);
        String queryStr = heartbeatUri.getQuery();
        List queryList = null;
        if (queryStr != null) {
            queryList = URLEncodedUtils.parse((String)queryStr, (Charset)Charset.defaultCharset());
        }
        if (queryList != null) {
            for (NameValuePair pair : queryList) {
                String value = pair.getValue();
                String key = pair.getName();
                if (QP_rpcTimeout.equals(key)) {
                    this.rpcTimeout = Integer.parseInt(value);
                    continue;
                }
                if (QP_retryTimeoutMillis.equals(key)) {
                    this.retryTimeoutMillis = Long.parseLong(value);
                    continue;
                }
                if (!QP_retryDelayMillis.equals(key)) continue;
                this.retryDelayMillis = Long.parseLong(value);
            }
        }
        InetSocketAddress address = NetUtils.createSocketAddrForHost((String)heartbeatUri.getHost(), (int)heartbeatUri.getPort());
        this.umbilical = (StreamingContainerUmbilicalProtocol)RPC.getProxy(StreamingContainerUmbilicalProtocol.class, (long)201208081755L, (InetSocketAddress)address, (UserGroupInformation)UserGroupInformation.getCurrentUser(), (Configuration)this.conf, (SocketFactory)NetUtils.getDefaultSocketFactory((Configuration)this.conf), (int)this.rpcTimeout);
    }

    public StreamingContainerUmbilicalProtocol getProxy() {
        StreamingContainerUmbilicalProtocol recoverableProxy = (StreamingContainerUmbilicalProtocol)Proxy.newProxyInstance(this.umbilical.getClass().getClassLoader(), this.umbilical.getClass().getInterfaces(), (InvocationHandler)this);
        return recoverableProxy;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws ConnectException, SocketTimeoutException, InterruptedException, IllegalAccessException {
        while (true) {
            try {
                if (this.umbilical == null) {
                    this.connect();
                }
                Object result = method.invoke((Object)this.umbilical, args);
                this.lastCompletedCallTms = System.currentTimeMillis();
                return result;
            }
            catch (InvocationTargetException e) {
                Throwable targetException = e.getTargetException();
                long connectMillis = System.currentTimeMillis() - this.lastCompletedCallTms;
                if (connectMillis < this.retryTimeoutMillis) {
                    LOG.warn("RPC failure, attempting reconnect after {} ms (remaining {} ms)", new Object[]{this.retryDelayMillis, this.retryTimeoutMillis - connectMillis, targetException});
                    this.close();
                    Thread.sleep(this.retryDelayMillis);
                    continue;
                }
                LOG.error("Giving up RPC connection recovery after {} ms", (Object)connectMillis, (Object)targetException);
                if (targetException instanceof ConnectException) {
                    throw (ConnectException)targetException;
                }
                if (targetException instanceof SocketTimeoutException) {
                    throw (SocketTimeoutException)targetException;
                }
                throw Throwables.propagate((Throwable)targetException);
            }
            catch (IOException ex) {
                this.close();
                throw new RuntimeException(ex);
            }
            break;
        }
    }

    @Override
    public void close() {
        LOG.debug("Closing RPC connection {}", (Object)this.lastConnectURI);
        if (this.umbilical != null) {
            RPC.stopProxy((Object)this.umbilical);
            this.umbilical = null;
        }
    }

    public static URI toConnectURI(InetSocketAddress address) throws Exception {
        int rpcTimeoutMillis = Integer.getInteger(RPC_TIMEOUT, 5000);
        long retryDelayMillis = Long.getLong(RETRY_DELAY, 10000L);
        long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, 30000L);
        return RecoverableRpcProxy.toConnectURI(address, rpcTimeoutMillis, retryDelayMillis, retryTimeoutMillis);
    }

    public static URI toConnectURI(InetSocketAddress address, int rpcTimeoutMillis, long retryDelayMillis, long retryTimeoutMillis) throws Exception {
        return new URIBuilder().setScheme("stram").setHost(address.getHostName()).setPort(address.getPort()).setParameter(QP_rpcTimeout, Integer.toString(rpcTimeoutMillis)).setParameter(QP_retryDelayMillis, Long.toString(retryDelayMillis)).setParameter(QP_retryTimeoutMillis, Long.toString(retryTimeoutMillis)).build();
    }
}

