/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.mesos.runtime.clusterframework;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.entrypoint.MesosEntrypointUtils;
import org.apache.flink.mesos.runtime.clusterframework.LaunchableMesosWorker;
import org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager;
import org.apache.flink.mesos.runtime.clusterframework.MesosJobManager;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;

public class MesosApplicationMasterRunner {
    protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
    private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5L, TimeUnit.MINUTES);
    private static final Map<String, String> ENV = System.getenv();
    private static final int INIT_ERROR_EXIT_CODE = 31;
    private static final int ACTOR_DIED_EXIT_CODE = 32;
    private static final Options ALL_OPTIONS = new Options().addOption(BootstrapTools.newDynamicPropertiesOption());

    public static void main(String[] args) {
        EnvironmentInformation.logEnvironmentInfo((Logger)LOG, (String)"Mesos AppMaster", (String[])args);
        SignalHandler.register((Logger)LOG);
        JvmShutdownSafeguard.installAsShutdownHook((Logger)LOG);
        int returnCode = new MesosApplicationMasterRunner().run(args);
        System.exit(returnCode);
    }

    protected int run(String[] args) {
        try {
            LOG.debug("All environment variables: {}", ENV);
            PosixParser parser = new PosixParser();
            CommandLine cmd = parser.parse(ALL_OPTIONS, args);
            final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties((CommandLine)cmd);
            final Configuration config = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);
            try {
                FileSystem.initialize((Configuration)config);
            }
            catch (IOException e) {
                throw new IOException("Error while configuring the filesystems.", e);
            }
            SecurityConfiguration sc = new SecurityConfiguration(config);
            SecurityUtils.install((SecurityConfiguration)sc);
            return (Integer)SecurityUtils.getInstalledContext().runSecured((Callable)new Callable<Integer>(){

                @Override
                public Integer call() throws Exception {
                    return MesosApplicationMasterRunner.this.runPrivileged(config, dynamicProperties);
                }
            });
        }
        catch (Throwable t) {
            LOG.error("Mesos AppMaster initialization failed", t);
            return 31;
        }
    }

    protected int runPrivileged(Configuration config, Configuration dynamicProperties) {
        ActorSystem actorSystem = null;
        WebMonitor webMonitor = null;
        MesosArtifactServer artifactServer = null;
        ExecutorService futureExecutor = null;
        ExecutorService ioExecutor = null;
        MesosServices mesosServices = null;
        HighAvailabilityServices highAvailabilityServices = null;
        MetricRegistryImpl metricRegistry = null;
        try {
            String appMasterHostname = config.getString(JobManagerOptions.ADDRESS, InetAddress.getLocalHost().getHostName());
            LOG.info("App Master Hostname to use: {}", (Object)appMasterHostname);
            MesosConfiguration mesosConfig = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, appMasterHostname);
            int numberProcessors = Hardware.getNumberCPUCores();
            futureExecutor = Executors.newScheduledThreadPool(numberProcessors, (ThreadFactory)new ExecutorThreadFactory("mesos-jobmanager-future"));
            ioExecutor = Executors.newFixedThreadPool(numberProcessors, (ThreadFactory)new ExecutorThreadFactory("mesos-jobmanager-io"));
            mesosServices = MesosServicesUtils.createMesosServices(config, appMasterHostname);
            MesosTaskManagerParameters taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
            int listeningPort = config.getInteger(JobManagerOptions.PORT);
            Preconditions.checkState((listeningPort >= 0 && listeningPort <= 65536 ? 1 : 0) != 0, (Object)("Config parameter \"" + JobManagerOptions.PORT.key() + "\" is invalid, it must be between 0 and 65536"));
            actorSystem = BootstrapTools.startActorSystem((Configuration)config, (String)appMasterHostname, (int)listeningPort, (Logger)LOG, (boolean)true);
            Address address = AkkaUtils.getAddress((ActorSystem)actorSystem);
            String akkaHostname = (String)address.host().get();
            int akkaPort = (Integer)address.port().get();
            LOG.info("Actor system bound to hostname {}.", (Object)akkaHostname);
            LOG.debug("Starting Artifact Server");
            artifactServer = mesosServices.getArtifactServer();
            ContainerSpecification taskManagerContainerSpec = new ContainerSpecification();
            taskManagerContainerSpec.getDynamicConfiguration().addAll(dynamicProperties);
            Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration((Configuration)new Configuration(), (String)akkaHostname, (int)akkaPort, (int)taskManagerParameters.containeredParameters().numSlots(), (FiniteDuration)TASKMANAGER_REGISTRATION_TIMEOUT);
            taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig);
            MesosEntrypointUtils.applyOverlays(config, taskManagerContainerSpec);
            LaunchableMesosWorker.configureArtifactServer(artifactServer, taskManagerContainerSpec);
            highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices((Configuration)config, (Executor)ioExecutor, (HighAvailabilityServicesUtils.AddressResolution)HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
            LOG.debug("Starting Web Frontend");
            Time webMonitorTimeout = Time.milliseconds((long)config.getLong(WebOptions.TIMEOUT));
            webMonitor = BootstrapTools.startWebMonitorIfConfigured((Configuration)config, (HighAvailabilityServices)highAvailabilityServices, (LeaderGatewayRetriever)new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds((long)50L)), (MetricQueryServiceRetriever)new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), (Time)webMonitorTimeout, (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)futureExecutor), (Logger)LOG);
            if (webMonitor != null) {
                URL webMonitorURL = new URL(webMonitor.getRestAddress());
                mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
            }
            LOG.debug("Starting JobManager actor");
            metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration((Configuration)config));
            metricRegistry.startQueryService(actorSystem, null);
            ActorRef jobManager = (ActorRef)JobManager.startJobManagerActors((Configuration)config, (ActorSystem)actorSystem, (ScheduledExecutorService)futureExecutor, (Executor)ioExecutor, (HighAvailabilityServices)highAvailabilityServices, (MetricRegistry)metricRegistry, (Option)(webMonitor != null ? Option.apply((Object)webMonitor.getRestAddress()) : Option.empty()), (Option)Option.apply((Object)"jobmanager"), (Option)Option.apply((Object)"archive"), this.getJobManagerClass(), this.getArchivistClass())._1();
            LOG.debug("Starting Mesos Flink Resource Manager");
            MesosWorkerStore workerStore = mesosServices.createMesosWorkerStore(config, ioExecutor);
            Props resourceMasterProps = MesosFlinkResourceManager.createActorProps(this.getResourceManagerClass(), config, mesosConfig, workerStore, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), taskManagerParameters, taskManagerContainerSpec, artifactServer, LOG);
            ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master");
            LOG.debug("Starting process reapers for JobManager");
            actorSystem.actorOf(Props.create(ProcessReaper.class, (Object[])new Object[]{resourceMaster, LOG, 32}), "Mesos_Resource_Master_Process_Reaper");
            actorSystem.actorOf(Props.create(ProcessReaper.class, (Object[])new Object[]{jobManager, LOG, 32}), "JobManager_Process_Reaper");
        }
        catch (Throwable t) {
            LOG.error("Mesos JobManager initialization failed", t);
            if (webMonitor != null) {
                try {
                    webMonitor.stop();
                }
                catch (Throwable ignored) {
                    LOG.warn("Failed to stop the web frontend", ignored);
                }
            }
            if (actorSystem != null) {
                try {
                    actorSystem.shutdown();
                }
                catch (Throwable tt) {
                    LOG.error("Error shutting down actor system", tt);
                }
            }
            if (futureExecutor != null) {
                try {
                    futureExecutor.shutdownNow();
                }
                catch (Throwable tt) {
                    LOG.error("Error shutting down future executor", tt);
                }
            }
            if (ioExecutor != null) {
                try {
                    ioExecutor.shutdownNow();
                }
                catch (Throwable tt) {
                    LOG.error("Error shutting down io executor", tt);
                }
            }
            if (mesosServices != null) {
                try {
                    mesosServices.close(false);
                }
                catch (Throwable tt) {
                    LOG.error("Error closing the mesos services.", tt);
                }
            }
            return 31;
        }
        LOG.info("Mesos JobManager started");
        actorSystem.awaitTermination();
        if (webMonitor != null) {
            try {
                webMonitor.stop();
            }
            catch (Throwable t) {
                LOG.error("Failed to stop the web frontend", t);
            }
        }
        if (highAvailabilityServices != null) {
            try {
                highAvailabilityServices.close();
            }
            catch (Throwable t) {
                LOG.error("Could not properly stop the high availability services.");
            }
        }
        if (metricRegistry != null) {
            try {
                metricRegistry.shutdown().get();
            }
            catch (Throwable t) {
                LOG.error("Could not shut down metric registry.", t);
            }
        }
        ExecutorUtils.gracefulShutdown((long)AkkaUtils.getTimeout((Configuration)config).toMillis(), (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{futureExecutor, ioExecutor});
        try {
            mesosServices.close(true);
        }
        catch (Throwable t) {
            LOG.error("Failed to clean up and close MesosServices.", t);
        }
        return 0;
    }

    protected Class<? extends MesosFlinkResourceManager> getResourceManagerClass() {
        return MesosFlinkResourceManager.class;
    }

    protected Class<? extends JobManager> getJobManagerClass() {
        return MesosJobManager.class;
    }

    protected Class<? extends MemoryArchivist> getArchivistClass() {
        return MemoryArchivist.class;
    }
}

