/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.daemon.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.DataOutput;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB;
import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider;
import org.apache.hadoop.hive.llap.security.LlapSecurityHelper;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapProtocolServerImpl
extends AbstractService
implements LlapProtocolBlockingPB,
LlapManagementProtocolPB {
    private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolServerImpl.class);
    private final int numHandlers;
    private final ContainerRunner containerRunner;
    private final int srvPort;
    private final int mngPort;
    private RPC.Server server;
    private RPC.Server mngServer;
    private final AtomicReference<InetSocketAddress> srvAddress;
    private final AtomicReference<InetSocketAddress> mngAddress;
    private org.apache.hadoop.hive.llap.security.SecretManager zkSecretManager;

    public LlapProtocolServerImpl(int numHandlers, ContainerRunner containerRunner, AtomicReference<InetSocketAddress> srvAddress, AtomicReference<InetSocketAddress> mngAddress, int srvPort, int mngPort) {
        super("LlapDaemonProtocolServerImpl");
        this.numHandlers = numHandlers;
        this.containerRunner = containerRunner;
        this.srvAddress = srvAddress;
        this.srvPort = srvPort;
        this.mngAddress = mngAddress;
        this.mngPort = mngPort;
        LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() + " with port configured to: " + srvPort);
    }

    public LlapDaemonProtocolProtos.SubmitWorkResponseProto submitWork(RpcController controller, LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws ServiceException {
        try {
            return this.containerRunner.submitWork(request);
        }
        catch (IOException e) {
            throw new ServiceException((Throwable)e);
        }
    }

    public LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller, LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request) throws ServiceException {
        return this.containerRunner.sourceStateUpdated(request);
    }

    public LlapDaemonProtocolProtos.QueryCompleteResponseProto queryComplete(RpcController controller, LlapDaemonProtocolProtos.QueryCompleteRequestProto request) throws ServiceException {
        return this.containerRunner.queryComplete(request);
    }

    public LlapDaemonProtocolProtos.TerminateFragmentResponseProto terminateFragment(RpcController controller, LlapDaemonProtocolProtos.TerminateFragmentRequestProto request) throws ServiceException {
        return this.containerRunner.terminateFragment(request);
    }

    public void serviceStart() {
        final Configuration conf = this.getConfig();
        final BlockingService daemonImpl = LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService((LlapDaemonProtocolProtos.LlapDaemonProtocol.BlockingInterface)this);
        final BlockingService managementImpl = LlapDaemonProtocolProtos.LlapManagementProtocol.newReflectiveBlockingService((LlapDaemonProtocolProtos.LlapManagementProtocol.BlockingInterface)this);
        if (!UserGroupInformation.isSecurityEnabled()) {
            this.startProtocolServers(conf, daemonImpl, managementImpl);
            return;
        }
        String llapPrincipal = HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_KERBEROS_PRINCIPAL);
        String llapKeytab = HiveConf.getVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
        this.zkSecretManager = org.apache.hadoop.hive.llap.security.SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab);
        UserGroupInformation daemonUgi = null;
        try {
            daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        daemonUgi.doAs((PrivilegedAction)new PrivilegedAction<Void>(){

            @Override
            public Void run() {
                LlapProtocolServerImpl.this.startProtocolServers(conf, daemonImpl, managementImpl);
                return null;
            }
        });
    }

    private void startProtocolServers(Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) {
        this.server = this.startProtocolServer(this.srvPort, this.numHandlers, this.srvAddress, conf, daemonImpl, LlapProtocolBlockingPB.class);
        this.mngServer = this.startProtocolServer(this.mngPort, 2, this.mngAddress, conf, managementImpl, LlapManagementProtocolPB.class);
    }

    private RPC.Server startProtocolServer(int srvPort, int numHandlers, AtomicReference<InetSocketAddress> bindAddress, Configuration conf, BlockingService impl, Class<?> protocolClass) {
        RPC.Server server;
        InetSocketAddress addr = new InetSocketAddress(srvPort);
        try {
            server = this.createServer(protocolClass, addr, conf, numHandlers, impl);
            server.start();
        }
        catch (IOException e) {
            LOG.error("Failed to run RPC Server on port: " + srvPort, (Throwable)e);
            throw new RuntimeException(e);
        }
        InetSocketAddress serverBindAddress = NetUtils.getConnectAddress((Server)server);
        bindAddress.set(NetUtils.createSocketAddrForHost((String)serverBindAddress.getAddress().getCanonicalHostName(), (int)serverBindAddress.getPort()));
        LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddress);
        return server;
    }

    public void serviceStop() {
        if (this.server != null) {
            this.server.stop();
        }
        if (this.mngServer != null) {
            this.mngServer.stop();
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    InetSocketAddress getBindAddress() {
        return this.srvAddress.get();
    }

    private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, int numHandlers, BlockingService blockingService) throws IOException {
        RPC.setProtocolEngine((Configuration)conf, pbProtocol, ProtobufRpcEngine.class);
        RPC.Builder builder = new RPC.Builder(conf).setProtocol(pbProtocol).setInstance((Object)blockingService).setBindAddress(addr.getHostName()).setPort(addr.getPort()).setNumHandlers(numHandlers);
        if (this.zkSecretManager != null) {
            builder = builder.setSecretManager((SecretManager)this.zkSecretManager);
        }
        RPC.Server server = builder.build();
        if (conf.getBoolean("hadoop.security.authorization", false)) {
            server.refreshServiceAcl(conf, (PolicyProvider)new LlapDaemonPolicyProvider());
        }
        return server;
    }

    public LlapDaemonProtocolProtos.GetTokenResponseProto getDelegationToken(RpcController controller, LlapDaemonProtocolProtos.GetTokenRequestProto request) throws ServiceException {
        UserGroupInformation ugi;
        if (this.zkSecretManager == null) {
            throw new ServiceException("Operation not supported on unsecure cluster");
        }
        try {
            ugi = UserGroupInformation.getCurrentUser();
        }
        catch (IOException e) {
            throw new ServiceException((Throwable)e);
        }
        String user = ugi.getUserName();
        Text owner = new Text(user);
        Text realUser = null;
        if (ugi.getRealUser() != null) {
            realUser = new Text(ugi.getRealUser().getUserName());
        }
        Text renewer = new Text(ugi.getShortUserName());
        LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser);
        Token token = new Token((TokenIdentifier)llapId, (SecretManager)this.zkSecretManager);
        ByteArrayDataOutput out = ByteStreams.newDataOutput();
        try {
            token.write((DataOutput)out);
        }
        catch (IOException e) {
            throw new ServiceException((Throwable)e);
        }
        ByteString bs = ByteString.copyFrom((byte[])out.toByteArray());
        LlapDaemonProtocolProtos.GetTokenResponseProto response = LlapDaemonProtocolProtos.GetTokenResponseProto.newBuilder().setToken(bs).build();
        return response;
    }
}

