/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.util.SecureExecutor;
import com.datatorrent.stram.webapp.OperatorInfo;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingContainerParent
extends CompositeService
implements StreamingContainerUmbilicalProtocol {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerParent.class);
    private Server server;
    private SecretManager<? extends TokenIdentifier> tokenSecretManager = null;
    private InetSocketAddress address;
    private final StreamingContainerManager dagManager;
    private final int listenerThreadCount;

    public StreamingContainerParent(String name, StreamingContainerManager dnodeMgr, SecretManager<? extends TokenIdentifier> secretManager, int listenerThreadCount) {
        super(name);
        this.dagManager = dnodeMgr;
        this.tokenSecretManager = secretManager;
        this.listenerThreadCount = listenerThreadCount;
    }

    public void init(Configuration conf) {
        super.init(conf);
    }

    public void start() {
        this.startRpcServer();
        super.start();
    }

    public void stop() {
        this.stopRpcServer();
        super.stop();
    }

    protected void startRpcServer() {
        Configuration conf = this.getConfig();
        LOG.info("Config: " + conf);
        LOG.info("Listener thread count " + this.listenerThreadCount);
        try {
            this.server = new RPC.Builder(conf).setProtocol(StreamingContainerUmbilicalProtocol.class).setInstance((Object)this).setBindAddress("0.0.0.0").setPort(0).setNumHandlers(this.listenerThreadCount).setSecretManager(this.tokenSecretManager).setVerbose(false).build();
            if (conf.getBoolean("hadoop.security.authorization", false)) {
                this.server.refreshServiceAcl(conf, new PolicyProvider(){

                    public Service[] getServices() {
                        return new Service[]{new Service(StreamingContainerUmbilicalProtocol.class.getName(), StreamingContainerUmbilicalProtocol.class)};
                    }
                });
            }
            this.server.start();
            this.address = NetUtils.getConnectAddress((Server)this.server);
            LOG.info("Container callback server listening at " + this.address);
        }
        catch (IOException e) {
            throw new YarnRuntimeException((Throwable)e);
        }
    }

    protected void stopRpcServer() {
        this.server.stop();
    }

    public InetSocketAddress getAddress() {
        return this.address;
    }

    public int getListenerThreadCount() {
        return this.listenerThreadCount;
    }

    void refreshServiceAcls(Configuration configuration, PolicyProvider policyProvider) {
        this.server.refreshServiceAcl(configuration, policyProvider);
    }

    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
        return ProtocolSignature.getProtocolSignature((VersionedProtocol)this, (String)protocol, (long)clientVersion, (int)clientMethodsHash);
    }

    public long getProtocolVersion(String arg0, long arg1) throws IOException {
        return 201208081755L;
    }

    @Override
    public void log(String containerId, String msg) throws IOException {
        LOG.info("child msg: {} context: {}", (Object)msg, (Object)this.dagManager.getContainerAgent((String)containerId).container);
    }

    @Override
    public void reportError(String containerId, int[] operators, String msg) {
        if (operators == null || operators.length == 0) {
            this.dagManager.recordEventAsync(new StramEvent.ContainerErrorEvent(containerId, msg));
        } else {
            for (int operator : operators) {
                OperatorInfo operatorInfo = this.dagManager.getOperatorInfo(operator);
                if (operatorInfo == null) continue;
                this.dagManager.recordEventAsync(new StramEvent.OperatorErrorEvent(operatorInfo.name, operator, containerId, msg));
            }
        }
        try {
            this.log(containerId, msg);
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Override
    public StreamingContainerUmbilicalProtocol.StreamingContainerContext getInitContext(String containerId) throws IOException {
        StreamingContainerAgent sca = this.dagManager.getContainerAgent(containerId);
        return sca.getInitContext();
    }

    @Override
    public StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse processHeartbeat(StreamingContainerUmbilicalProtocol.ContainerHeartbeat msg) {
        long now = System.currentTimeMillis();
        if (msg.sentTms - now > 50L) {
            LOG.warn("Child container heartbeat sent time for {} ({}) is greater than the receive timestamp in AM ({}). Please make sure the clocks are in sync", new Object[]{msg.getContainerId(), msg.sentTms, now});
        }
        this.dagManager.updateRPCLatency(msg.getContainerId(), now - msg.sentTms);
        try {
            final StreamingContainerUmbilicalProtocol.ContainerHeartbeat fmsg = msg;
            return SecureExecutor.execute(new SecureExecutor.WorkLoad<StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse>(){

                @Override
                public StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse run() {
                    return StreamingContainerParent.this.dagManager.processHeartbeat(fmsg);
                }
            });
        }
        catch (IOException ex) {
            LOG.error("Error processing heartbeat", (Throwable)ex);
            return null;
        }
    }
}

