/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.hooks;

import io.prestosql.hive.$internal.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.prestosql.hive.$internal.org.json.JSONObject;
import io.prestosql.hive.$internal.org.slf4j.Logger;
import io.prestosql.hive.$internal.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
import org.apache.hadoop.hive.ql.plan.ExplainWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hive.common.util.ShutdownHookManager;

public class ATSHook
implements ExecuteWithHookContext {
    private static final Logger LOG = LoggerFactory.getLogger(ATSHook.class.getName());
    private static final Object LOCK = new Object();
    private static final int VERSION = 2;
    private static ExecutorService executor;
    private static ExecutorService senderExecutor;
    private static TimelineClient timelineClient;
    private static final String ATS_DOMAIN_PREFIX = "hive_";
    private static boolean defaultATSDomainCreated;
    private static final String DEFAULT_ATS_DOMAIN = "hive_default_ats";
    private static final int WAIT_TIME = 3;
    private static final String[] PERF_KEYS;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void setupAtsExecutor(HiveConf conf) {
        Object object = LOCK;
        synchronized (object) {
            if (executor == null) {
                int queueCapacity = conf.getIntVar(HiveConf.ConfVars.ATSHOOKQUEUECAPACITY);
                LOG.info("Creating ATS executor queue with capacity " + queueCapacity);
                LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity);
                ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build();
                executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue, threadFactory);
                LinkedBlockingQueue<Runnable> senderQueue = new LinkedBlockingQueue<Runnable>(queueCapacity);
                senderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, senderQueue, threadFactory);
                YarnConfiguration yarnConf = new YarnConfiguration();
                timelineClient = TimelineClient.createTimelineClient();
                timelineClient.init((Configuration)yarnConf);
                timelineClient.start();
                ShutdownHookManager.addShutdownHook(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            executor.shutdown();
                            executor.awaitTermination(3L, TimeUnit.SECONDS);
                            executor = null;
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                        timelineClient.stop();
                    }
                });
            }
        }
    }

    public ATSHook() {
        LOG.info("Created ATS Hook");
    }

    private void createTimelineDomain(String domainId, String readers, String writers) throws Exception {
        TimelineDomain timelineDomain = new TimelineDomain();
        timelineDomain.setId(domainId);
        timelineDomain.setReaders(readers);
        timelineDomain.setWriters(writers);
        timelineClient.putDomain(timelineDomain);
        LOG.info("ATS domain created:" + domainId + "(" + readers + "," + writers + ")");
    }

    private String createOrGetDomain(HookContext hookContext) throws Exception {
        String domainId;
        String domainReaders = null;
        String domainWriters = null;
        boolean create = false;
        if (SessionState.get() != null) {
            if (SessionState.get().getATSDomainId() == null) {
                domainId = ATS_DOMAIN_PREFIX + hookContext.getSessionId();
                if (SessionState.get().getATSDomainId() == null) {
                    String requestuser = hookContext.getUserName();
                    if (hookContext.getUserName() == null) {
                        requestuser = hookContext.getUgi().getShortUserName();
                    }
                    boolean addHs2User = HiveConf.getBoolVar(hookContext.getConf(), HiveConf.ConfVars.HIVETEZHS2USERACCESS);
                    UserGroupInformation loginUserUgi = UserGroupInformation.getLoginUser();
                    String loginUser = loginUserUgi == null ? null : loginUserUgi.getShortUserName();
                    domainReaders = Utilities.getAclStringWithHiveModification(hookContext.getConf(), "tez.am.view-acls", addHs2User, requestuser, loginUser);
                    domainWriters = Utilities.getAclStringWithHiveModification(hookContext.getConf(), "tez.am.modify-acls", addHs2User, requestuser, loginUser);
                    SessionState.get().setATSDomainId(domainId);
                    create = true;
                }
            } else {
                domainId = SessionState.get().getATSDomainId();
            }
        } else {
            if (!defaultATSDomainCreated) {
                domainReaders = domainWriters = UserGroupInformation.getCurrentUser().getShortUserName();
                defaultATSDomainCreated = true;
                create = true;
            }
            domainId = DEFAULT_ATS_DOMAIN;
        }
        if (create) {
            final String readers = domainReaders;
            final String writers = domainWriters;
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        ATSHook.this.createTimelineDomain(domainId, readers, writers);
                    }
                    catch (Exception e) {
                        LOG.warn("Failed to create ATS domain " + domainId, e);
                    }
                }
            });
        }
        return domainId;
    }

    @Override
    public void run(final HookContext hookContext) throws Exception {
        final long currentTime = System.currentTimeMillis();
        final HiveConf conf = new HiveConf(hookContext.getConf());
        final QueryState queryState = hookContext.getQueryState();
        final String queryId = queryState.getQueryId();
        final HashMap<String, Long> durations = new HashMap<String, Long>();
        for (String key : hookContext.getPerfLogger().getEndTimes().keySet()) {
            durations.put(key, hookContext.getPerfLogger().getDuration(key));
        }
        try {
            ATSHook.setupAtsExecutor(conf);
            final String domainId = this.createOrGetDomain(hookContext);
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        int numTezJobs;
                        int numMrJobs;
                        QueryPlan plan = hookContext.getQueryPlan();
                        if (plan == null) {
                            return;
                        }
                        String queryId2 = plan.getQueryId();
                        String opId = hookContext.getOperationId();
                        long queryStartTime = plan.getQueryStartTime();
                        String user = hookContext.getUgi().getShortUserName();
                        String requestuser = hookContext.getUserName();
                        if (hookContext.getUserName() == null) {
                            requestuser = hookContext.getUgi().getUserName();
                        }
                        if ((numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size()) + (numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size()) <= 0) {
                            return;
                        }
                        switch (hookContext.getHookType()) {
                            case PRE_EXEC_HOOK: {
                                ExplainConfiguration config = new ExplainConfiguration();
                                config.setFormatted(true);
                                ExplainWork work = new ExplainWork(null, null, plan.getRootTasks(), plan.getFetchTask(), null, config, null);
                                ExplainTask explain = (ExplainTask)TaskFactory.get(work, conf, new Task[0]);
                                explain.initialize(queryState, plan, null, null);
                                String query = plan.getQueryStr();
                                JSONObject explainPlan = explain.getJSONPlan(null, work);
                                String logID = conf.getLogIdVar(hookContext.getSessionId());
                                List<String> tablesRead = ATSHook.this.getTablesFromEntitySet(hookContext.getInputs());
                                List<String> tablesWritten = ATSHook.this.getTablesFromEntitySet(hookContext.getOutputs());
                                String executionMode = ATSHook.this.getExecutionMode(plan).name();
                                String hiveInstanceAddress = hookContext.getHiveInstanceAddress();
                                if (hiveInstanceAddress == null) {
                                    hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress();
                                }
                                String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI";
                                ApplicationId llapId = ATSHook.this.determineLlapId(conf, plan);
                                ATSHook.this.fireAndForget(ATSHook.this.createPreHookEvent(queryId2, query, explainPlan, queryStartTime, user, requestuser, numMrJobs, numTezJobs, opId, hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType, hookContext.getSessionId(), logID, hookContext.getThreadId(), executionMode, tablesRead, tablesWritten, conf, llapId, domainId));
                                break;
                            }
                            case POST_EXEC_HOOK: {
                                ATSHook.this.fireAndForget(ATSHook.this.createPostHookEvent(queryId2, currentTime, user, requestuser, true, opId, durations, domainId));
                                break;
                            }
                            case ON_FAILURE_HOOK: {
                                ATSHook.this.fireAndForget(ATSHook.this.createPostHookEvent(queryId2, currentTime, user, requestuser, false, opId, durations, domainId));
                                break;
                            }
                        }
                    }
                    catch (Exception e) {
                        LOG.warn("Failed to submit plan to ATS for " + queryId, e);
                    }
                }
            });
        }
        catch (Exception e) {
            LOG.warn("Failed to submit to ATS for " + queryId, e);
        }
    }

    protected List<String> getTablesFromEntitySet(Set<? extends Entity> entities) {
        ArrayList<String> tableNames = new ArrayList<String>();
        for (Entity entity : entities) {
            if (entity.getType() != Entity.Type.TABLE) continue;
            tableNames.add(entity.getTable().getDbName() + "." + entity.getTable().getTableName());
        }
        return tableNames;
    }

    protected ExecutionMode getExecutionMode(QueryPlan plan) {
        int numMRJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
        int numSparkJobs = Utilities.getSparkTasks(plan.getRootTasks()).size();
        int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size();
        ExecutionMode mode = ExecutionMode.MR;
        if (0 == numMRJobs + numSparkJobs + numTezJobs) {
            mode = ExecutionMode.NONE;
        } else {
            if (numSparkJobs > 0) {
                return ExecutionMode.SPARK;
            }
            if (numTezJobs > 0) {
                mode = ExecutionMode.TEZ;
                for (TezTask tezTask : Utilities.getTezTasks(plan.getRootTasks())) {
                    if (!((TezWork)tezTask.getWork()).getLlapMode()) continue;
                    mode = ExecutionMode.LLAP;
                    break;
                }
            }
        }
        return mode;
    }

    TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId, String clientIpAddress, String hiveInstanceAddress, String hiveInstanceType, String sessionID, String logID, String threadId, String executionMode, List<String> tablesRead, List<String> tablesWritten, HiveConf conf, ApplicationId llapAppId, String domainId) throws Exception {
        JSONObject queryObj = new JSONObject(new LinkedHashMap());
        queryObj.put("queryText", query);
        queryObj.put("queryPlan", explainPlan);
        LOG.info("Received pre-hook notification for :" + queryId);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Otherinfo: " + queryObj.toString());
            LOG.debug("Operation id: <" + opId + ">");
        }
        conf.stripHiddenConfigurations(conf);
        HashMap confMap = new HashMap();
        Iterator iterator = conf.iterator();
        while (iterator.hasNext()) {
            Map.Entry setting = (Map.Entry)iterator.next();
            confMap.put(setting.getKey(), setting.getValue());
        }
        JSONObject confObj = new JSONObject(confMap);
        TimelineEntity atsEntity = new TimelineEntity();
        atsEntity.setEntityId(queryId);
        atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name());
        atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), (Object)user);
        atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), (Object)requestuser);
        atsEntity.addPrimaryFilter(PrimaryFilterTypes.executionmode.name(), (Object)executionMode);
        atsEntity.addPrimaryFilter(PrimaryFilterTypes.queue.name(), (Object)conf.get("mapreduce.job.queuename"));
        if (opId != null) {
            atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), (Object)opId);
        }
        for (String tabName : tablesRead) {
            atsEntity.addPrimaryFilter(PrimaryFilterTypes.tablesread.name(), (Object)tabName);
        }
        for (String tabName : tablesWritten) {
            atsEntity.addPrimaryFilter(PrimaryFilterTypes.tableswritten.name(), (Object)tabName);
        }
        TimelineEvent startEvt = new TimelineEvent();
        startEvt.setEventType(EventTypes.QUERY_SUBMITTED.name());
        startEvt.setTimestamp(startTime);
        atsEntity.addEvent(startEvt);
        atsEntity.addOtherInfo(OtherInfoTypes.QUERY.name(), (Object)queryObj.toString());
        atsEntity.addOtherInfo(OtherInfoTypes.TEZ.name(), (Object)(numTezJobs > 0 ? 1 : 0));
        atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), (Object)(numMrJobs > 0 ? 1 : 0));
        atsEntity.addOtherInfo(OtherInfoTypes.SESSION_ID.name(), (Object)sessionID);
        atsEntity.addOtherInfo(OtherInfoTypes.INVOKER_INFO.name(), (Object)logID);
        atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), (Object)threadId);
        atsEntity.addOtherInfo(OtherInfoTypes.VERSION.name(), (Object)2);
        if (clientIpAddress != null) {
            atsEntity.addOtherInfo(OtherInfoTypes.CLIENT_IP_ADDRESS.name(), (Object)clientIpAddress);
        }
        atsEntity.addOtherInfo(OtherInfoTypes.HIVE_ADDRESS.name(), (Object)hiveInstanceAddress);
        atsEntity.addOtherInfo(OtherInfoTypes.HIVE_INSTANCE_TYPE.name(), (Object)hiveInstanceType);
        atsEntity.addOtherInfo(OtherInfoTypes.CONF.name(), (Object)confObj.toString());
        if (llapAppId != null) {
            atsEntity.addOtherInfo(OtherInfoTypes.LLAP_APP_ID.name(), (Object)llapAppId.toString());
        }
        atsEntity.setDomainId(domainId);
        return atsEntity;
    }

    TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, String requestuser, boolean success, String opId, Map<String, Long> durations, String domainId) throws Exception {
        LOG.info("Received post-hook notification for :" + queryId);
        TimelineEntity atsEntity = new TimelineEntity();
        atsEntity.setEntityId(queryId);
        atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name());
        atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), (Object)user);
        atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), (Object)requestuser);
        if (opId != null) {
            atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), (Object)opId);
        }
        TimelineEvent stopEvt = new TimelineEvent();
        stopEvt.setEventType(EventTypes.QUERY_COMPLETED.name());
        stopEvt.setTimestamp(stopTime);
        atsEntity.addEvent(stopEvt);
        atsEntity.addOtherInfo(OtherInfoTypes.STATUS.name(), (Object)success);
        JSONObject perfObj = new JSONObject(new LinkedHashMap());
        for (Map.Entry<String, Long> entry : durations.entrySet()) {
            perfObj.put(entry.getKey(), entry.getValue());
        }
        atsEntity.addOtherInfo(OtherInfoTypes.PERF.name(), (Object)perfObj.toString());
        atsEntity.setDomainId(domainId);
        return atsEntity;
    }

    void fireAndForget(final TimelineEntity entity) throws Exception {
        senderExecutor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    timelineClient.putEntities(new TimelineEntity[]{entity});
                }
                catch (Exception err) {
                    LOG.warn("Failed to send event to ATS", err);
                }
            }
        });
    }

    private ApplicationId determineLlapId(HiveConf conf, QueryPlan plan) throws IOException {
        for (TezTask tezTask : Utilities.getTezTasks(plan.getRootTasks())) {
            if (!((TezWork)tezTask.getWork()).getLlapMode()) continue;
            String hosts = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
            if (hosts != null && !hosts.isEmpty()) {
                ApplicationId llapId = LlapRegistryService.getClient((Configuration)conf).getApplicationId();
                LOG.info("The query will use LLAP instance " + llapId + " (" + hosts + ")");
                return llapId;
            }
            LOG.info("Cannot determine LLAP instance on client - service hosts are not set");
            return null;
        }
        return null;
    }

    static {
        defaultATSDomainCreated = false;
        PERF_KEYS = new String[]{"parse", "compile", "semanticAnalyze", "optimizer", "getSplits", "runTasks"};
    }

    private static enum PrimaryFilterTypes {
        user,
        requestuser,
        operationid,
        executionmode,
        tablesread,
        tableswritten,
        queue;

    }

    private static enum ExecutionMode {
        MR,
        TEZ,
        LLAP,
        SPARK,
        NONE;

    }

    private static enum OtherInfoTypes {
        QUERY,
        STATUS,
        TEZ,
        MAPRED,
        INVOKER_INFO,
        SESSION_ID,
        THREAD_NAME,
        VERSION,
        CLIENT_IP_ADDRESS,
        HIVE_ADDRESS,
        HIVE_INSTANCE_TYPE,
        CONF,
        PERF,
        LLAP_APP_ID;

    }

    private static enum EventTypes {
        QUERY_SUBMITTED,
        QUERY_COMPLETED;

    }

    private static enum EntityTypes {
        HIVE_QUERY_ID;

    }
}

