/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.daemon.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryType;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
import org.apache.hadoop.hive.llap.daemon.impl.AMReporter;
import org.apache.hadoop.hive.llap.daemon.impl.ContainerRunnerImpl;
import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonMXBean;
import org.apache.hadoop.hive.llap.daemon.impl.LlapProtocolServerImpl;
import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.services.impl.LlapWebServices;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.logging.log4j.core.config.Configurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapDaemon
extends CompositeService
implements ContainerRunner,
LlapDaemonMXBean {
    private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class);
    public static final String LOG4j2_PROPERTIES_FILE = "llap-daemon-log4j2.properties";
    public static final String HADOOP_METRICS2_PROPERTIES_FILE = "hadoop-metrics2.properties";
    private final Configuration shuffleHandlerConf;
    private final LlapProtocolServerImpl server;
    private final ContainerRunnerImpl containerRunner;
    private final AMReporter amReporter;
    private final LlapRegistryService registry;
    private final LlapWebServices webServices;
    private final AtomicLong numSubmissions = new AtomicLong(0L);
    private final JvmPauseMonitor pauseMonitor;
    private final ObjectName llapDaemonInfoBean;
    private final LlapDaemonExecutorMetrics metrics;
    private final boolean llapIoEnabled;
    private final long executorMemoryPerInstance;
    private final long ioMemoryPerInstance;
    private final int numExecutors;
    private final long maxJvmMemory;
    private final String[] localDirs;
    private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference();
    private final AtomicReference<InetSocketAddress> mngAddress = new AtomicReference();
    private final AtomicReference<Integer> shufflePort = new AtomicReference();

    public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes, boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort, int mngPort, int shufflePort) {
        super("LlapDaemon");
        this.initializeLogging();
        this.printAsciiArt();
        Preconditions.checkArgument((numExecutors > 0 ? 1 : 0) != 0);
        Preconditions.checkArgument((srvPort == 0 || srvPort > 1024 && srvPort < 65536 ? 1 : 0) != 0, (Object)"Server RPC Port must be between 1025 and 65535, or 0 automatic selection");
        Preconditions.checkArgument((mngPort == 0 || mngPort > 1024 && mngPort < 65536 ? 1 : 0) != 0, (Object)"Management RPC Port must be between 1025 and 65535, or 0 automatic selection");
        Preconditions.checkArgument((localDirs != null && localDirs.length > 0 ? 1 : 0) != 0, (Object)"Work dirs must be specified");
        Preconditions.checkArgument((shufflePort == 0 || shufflePort > 1024 && shufflePort < 65536 ? 1 : 0) != 0, (Object)"Shuffle Port must be betwee 1024 and 65535, or 0 for automatic selection");
        this.maxJvmMemory = LlapDaemon.getTotalHeapSize();
        this.llapIoEnabled = ioEnabled;
        this.executorMemoryPerInstance = executorMemoryBytes;
        this.ioMemoryPerInstance = ioMemoryBytes;
        this.numExecutors = numExecutors;
        this.localDirs = localDirs;
        int waitQueueSize = HiveConf.getIntVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
        boolean enablePreemption = HiveConf.getBoolVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
        LOG.info("Attempting to start LlapDaemonConf with the following configuration: numExecutors=" + numExecutors + ", rpcListenerPort=" + srvPort + ", mngListenerPort=" + mngPort + ", workDirs=" + Arrays.toString(localDirs) + ", shufflePort=" + shufflePort + ", executorMemory=" + executorMemoryBytes + ", llapIoEnabled=" + ioEnabled + ", llapIoCacheIsDirect=" + isDirectCache + ", llapIoCacheSize=" + ioMemoryBytes + ", jvmAvailableMemory=" + this.maxJvmMemory + ", waitQueueSize= " + waitQueueSize + ", enablePreemption= " + enablePreemption);
        long memRequired = executorMemoryBytes + (ioEnabled && !isDirectCache ? ioMemoryBytes : 0L);
        Preconditions.checkState((this.maxJvmMemory >= memRequired ? 1 : 0) != 0, (Object)("Invalid configuration. Xmx value too small. maxAvailable=" + this.maxJvmMemory + ", configured(exec + io if enabled)=" + memRequired));
        this.shuffleHandlerConf = new Configuration(daemonConf);
        this.shuffleHandlerConf.setInt("llap.shuffle.port", shufflePort);
        this.shuffleHandlerConf.set("llap.shuffle.handler.local-dirs", StringUtils.arrayToString((String[])localDirs));
        this.shuffleHandlerConf.setBoolean("llap.shuffle.dir-watcher.enabled", HiveConf.getBoolVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED));
        int numHandlers = HiveConf.getIntVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
        LlapMetricsSystem.initialize("LlapDaemon");
        this.pauseMonitor = new JvmPauseMonitor(daemonConf);
        this.pauseMonitor.start();
        String displayName = "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName();
        String sessionId = MetricsUtils.getUUID();
        daemonConf.set("llap.daemon.metrics.sessionid", sessionId);
        this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors);
        this.metrics.getJvmMetrics().setPauseMonitor(this.pauseMonitor);
        this.llapDaemonInfoBean = MBeans.register((String)"LlapDaemon", (String)"LlapDaemonInfo", (Object)this);
        LOG.info("Started LlapMetricsSystem with displayName: " + displayName + " sessionId: " + sessionId);
        this.amReporter = new AMReporter(this.srvAddress, new QueryFailedHandlerProxy(), daemonConf);
        this.server = new LlapProtocolServerImpl(numHandlers, this, this.srvAddress, this.mngAddress, srvPort, mngPort);
        this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize, enablePreemption, localDirs, this.shufflePort, this.srvAddress, executorMemoryBytes, this.metrics, this.amReporter);
        this.addIfService(this.containerRunner);
        this.registry = new LlapRegistryService(true);
        this.addIfService(this.registry);
        if (HiveConf.getBoolVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.HIVE_IN_TEST)) {
            this.webServices = null;
        } else {
            this.webServices = new LlapWebServices();
            this.addIfService((Object)this.webServices);
        }
        this.addIfService((Object)this.server);
        this.addIfService((Object)this.amReporter);
    }

    private void initializeLogging() {
        long start = System.currentTimeMillis();
        URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE);
        if (llap_l4j2 == null) {
            throw new RuntimeException("Log initialization failed. Unable to locate llap-daemon-log4j2.properties file in classpath");
        }
        Configurator.initialize((String)"LlapDaemonLog4j2", (String)llap_l4j2.toString());
        long end = System.currentTimeMillis();
        LOG.info("LLAP daemon logging initialized from {} in {} ms", (Object)llap_l4j2, (Object)(end - start));
    }

    public static long getTotalHeapSize() {
        long total = 0L;
        for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
            long sz = mp.getUsage().getMax();
            if (mp.getName().contains("Survivor")) {
                sz *= 2L;
            }
            if (!mp.getType().equals((Object)MemoryType.HEAP)) continue;
            total += sz;
        }
        total += total % 0x100000L;
        return total;
    }

    private void printAsciiArt() {
        String asciiArt = "$$\\       $$\\        $$$$$$\\  $$$$$$$\\\n$$ |      $$ |      $$  __$$\\ $$  __$$\\\n$$ |      $$ |      $$ /  $$ |$$ |  $$ |\n$$ |      $$ |      $$$$$$$$ |$$$$$$$  |\n$$ |      $$ |      $$  __$$ |$$  ____/\n$$ |      $$ |      $$ |  $$ |$$ |\n$$$$$$$$\\ $$$$$$$$\\ $$ |  $$ |$$ |\n\\________|\\________|\\__|  \\__|\\__|\n\n";
        LOG.info("\n\n$$\\       $$\\        $$$$$$\\  $$$$$$$\\\n$$ |      $$ |      $$  __$$\\ $$  __$$\\\n$$ |      $$ |      $$ /  $$ |$$ |  $$ |\n$$ |      $$ |      $$$$$$$$ |$$$$$$$  |\n$$ |      $$ |      $$  __$$ |$$  ____/\n$$ |      $$ |      $$ |  $$ |$$ |\n$$$$$$$$\\ $$$$$$$$\\ $$ |  $$ |$$ |\n\\________|\\________|\\__|  \\__|\\__|\n\n");
    }

    public void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        LlapProxy.setDaemon((boolean)true);
        LlapProxy.initializeLlapIo((Configuration)conf);
    }

    public void serviceStart() throws Exception {
        ShuffleHandler.initializeAndStart(this.shuffleHandlerConf);
        LOG.info("Setting shuffle port to: " + ShuffleHandler.get().getPort());
        this.shufflePort.set(ShuffleHandler.get().getPort());
        super.serviceStart();
        LOG.info("LlapDaemon serviceStart complete");
    }

    public void serviceStop() throws Exception {
        super.serviceStop();
        ShuffleHandler.shutdown();
        this.shutdown();
        LOG.info("LlapDaemon shutdown complete");
    }

    public void shutdown() {
        LOG.info("LlapDaemon shutdown invoked");
        if (this.llapDaemonInfoBean != null) {
            try {
                MBeans.unregister((ObjectName)this.llapDaemonInfoBean);
            }
            catch (Throwable ex) {
                LOG.info("Error unregistering the bean; ignoring", ex);
            }
        }
        if (this.pauseMonitor != null) {
            this.pauseMonitor.stop();
        }
        if (this.metrics != null) {
            LlapMetricsSystem.shutdown();
        }
        LlapProxy.close();
    }

    public static void main(String[] args) throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(new LlapDaemonUncaughtExceptionHandler());
        LlapDaemon llapDaemon = null;
        try {
            LlapConfiguration daemonConf = new LlapConfiguration();
            int numExecutors = HiveConf.getIntVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
            String localDirList = HiveConf.getVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_WORK_DIRS);
            if (localDirList == null || localDirList.isEmpty()) {
                localDirList = daemonConf.get("yarn.nodemanager.local-dirs");
            }
            String[] localDirs = localDirList == null || localDirList.isEmpty() ? new String[]{} : StringUtils.getTrimmedStrings((String)localDirList);
            int rpcPort = HiveConf.getIntVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
            int mngPort = HiveConf.getIntVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT);
            int shufflePort = daemonConf.getInt("llap.shuffle.port", 15551);
            long executorMemoryBytes = (long)HiveConf.getIntVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024L * 1024L;
            long ioMemoryBytes = HiveConf.getSizeVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
            boolean isDirectCache = HiveConf.getBoolVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
            boolean llapIoEnabled = HiveConf.getBoolVar((Configuration)daemonConf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_IO_ENABLED);
            llapDaemon = new LlapDaemon((Configuration)daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
            LOG.info("Adding shutdown hook for LlapDaemon");
            ShutdownHookManager.addShutdownHook((Runnable)new CompositeService.CompositeServiceShutdownHook((CompositeService)llapDaemon), (int)1);
            llapDaemon.init((Configuration)daemonConf);
            llapDaemon.start();
            LOG.info("Started LlapDaemon");
        }
        catch (Throwable t) {
            LOG.warn("Failed to start LLAP Daemon with exception", t);
            if (llapDaemon != null) {
                llapDaemon.shutdown();
            }
            System.exit(-1);
        }
    }

    @Override
    public LlapDaemonProtocolProtos.SubmitWorkResponseProto submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto request) throws IOException {
        this.numSubmissions.incrementAndGet();
        return this.containerRunner.submitWork(request);
    }

    @Override
    public LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto sourceStateUpdated(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto request) {
        return this.containerRunner.sourceStateUpdated(request);
    }

    @Override
    public LlapDaemonProtocolProtos.QueryCompleteResponseProto queryComplete(LlapDaemonProtocolProtos.QueryCompleteRequestProto request) {
        return this.containerRunner.queryComplete(request);
    }

    @Override
    public LlapDaemonProtocolProtos.TerminateFragmentResponseProto terminateFragment(LlapDaemonProtocolProtos.TerminateFragmentRequestProto request) {
        return this.containerRunner.terminateFragment(request);
    }

    @VisibleForTesting
    public long getNumSubmissions() {
        return this.numSubmissions.get();
    }

    public InetSocketAddress getListenerAddress() {
        return this.server.getBindAddress();
    }

    @Override
    public int getRpcPort() {
        return this.server.getBindAddress().getPort();
    }

    @Override
    public int getNumExecutors() {
        return this.numExecutors;
    }

    @Override
    public int getShufflePort() {
        return ShuffleHandler.get().getPort();
    }

    @Override
    public String getLocalDirs() {
        return Joiner.on((String)",").skipNulls().join((Object[])this.localDirs);
    }

    @Override
    public Set<String> getExecutorsStatus() {
        return this.containerRunner.getExecutorStatus();
    }

    @Override
    public long getExecutorMemoryPerInstance() {
        return this.executorMemoryPerInstance;
    }

    @Override
    public long getIoMemoryPerInstance() {
        return this.ioMemoryPerInstance;
    }

    @Override
    public boolean isIoEnabled() {
        return this.llapIoEnabled;
    }

    @Override
    public long getMaxJvmMemory() {
        return this.maxJvmMemory;
    }

    private class QueryFailedHandlerProxy
    implements QueryFailedHandler {
        private QueryFailedHandlerProxy() {
        }

        @Override
        public void queryFailed(QueryIdentifier queryIdentifier) {
            LlapDaemon.this.containerRunner.queryFailed(queryIdentifier);
        }
    }

    private static class LlapDaemonUncaughtExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        private LlapDaemonUncaughtExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            LOG.info("UncaughtExceptionHandler invoked");
            if (ShutdownHookManager.isShutdownInProgress()) {
                LOG.warn("Thread {} threw a Throwable, but we are shutting down, so ignoring this", (Object)t, (Object)e);
            } else if (e instanceof Error) {
                try {
                    LOG.error("Thread {} threw an Error.  Shutting down now...", (Object)t, (Object)e);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
                if (e instanceof OutOfMemoryError) {
                    try {
                        System.err.println("Halting due to Out Of Memory Error...");
                        e.printStackTrace();
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                    ExitUtil.halt((int)-1);
                } else {
                    ExitUtil.terminate((int)-1);
                }
            } else {
                LOG.error("Thread {} threw an Exception. Shutting down now...", (Object)t, (Object)e);
                ExitUtil.terminate((int)-1);
            }
        }
    }
}

