/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.mesos.runtime.clusterframework;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.TaskScheduler;
import com.netflix.fenzo.VirtualMachineLease;
import com.netflix.fenzo.functions.Action1;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.LaunchableMesosWorker;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.runtime.clusterframework.RegisteredMesosWorkerNode;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
import org.apache.flink.mesos.scheduler.TaskMonitor;
import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
import org.apache.flink.mesos.scheduler.Tasks;
import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
import org.apache.flink.mesos.scheduler.messages.Disconnected;
import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
import org.apache.flink.mesos.scheduler.messages.FrameworkMessage;
import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
import org.apache.flink.mesos.scheduler.messages.ReRegistered;
import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.SlaveLost;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

public class MesosResourceManager
extends ResourceManager<RegisteredMesosWorkerNode> {
    protected static final Logger LOG = LoggerFactory.getLogger(MesosResourceManager.class);
    private final Configuration flinkConfig;
    private final MesosConfiguration mesosConfig;
    private final MesosServices mesosServices;
    private final MesosTaskManagerParameters taskManagerParameters;
    private final ContainerSpecification taskManagerContainerSpec;
    private final MesosArtifactServer artifactServer;
    private MesosWorkerStore workerStore;
    private final ActorSystem actorSystem;
    private SchedulerDriver schedulerDriver;
    private ActorRef selfActor;
    private ActorRef connectionMonitor;
    private ActorRef taskMonitor;
    private ActorRef launchCoordinator;
    private ActorRef reconciliationCoordinator;
    final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;

    public MesosResourceManager(RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, Configuration flinkConfig, MesosServices mesosServices, MesosConfiguration mesosConfig, MesosTaskManagerParameters taskManagerParameters, ContainerSpecification taskManagerContainerSpec) {
        super(rpcService, resourceManagerEndpointId, resourceId, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, metricRegistry, jobLeaderIdService, clusterInformation, fatalErrorHandler);
        this.mesosServices = (MesosServices)Preconditions.checkNotNull((Object)mesosServices);
        this.actorSystem = (ActorSystem)Preconditions.checkNotNull((Object)mesosServices.getLocalActorSystem());
        this.flinkConfig = (Configuration)Preconditions.checkNotNull((Object)flinkConfig);
        this.mesosConfig = (MesosConfiguration)Preconditions.checkNotNull((Object)mesosConfig);
        this.artifactServer = (MesosArtifactServer)Preconditions.checkNotNull((Object)mesosServices.getArtifactServer());
        this.taskManagerParameters = (MesosTaskManagerParameters)Preconditions.checkNotNull((Object)taskManagerParameters);
        this.taskManagerContainerSpec = (ContainerSpecification)Preconditions.checkNotNull((Object)taskManagerContainerSpec);
        this.workersInNew = new HashMap<ResourceID, MesosWorkerStore.Worker>(8);
        this.workersInLaunch = new HashMap<ResourceID, MesosWorkerStore.Worker>(8);
        this.workersBeingReturned = new HashMap<ResourceID, MesosWorkerStore.Worker>(8);
    }

    protected ActorRef createSelfActor() {
        return this.actorSystem.actorOf(Props.create(AkkaAdapter.class, (Object[])new Object[]{this}), "ResourceManager");
    }

    protected ActorRef createConnectionMonitor() {
        return this.actorSystem.actorOf(ConnectionMonitor.createActorProps(ConnectionMonitor.class, this.flinkConfig), "connectionMonitor");
    }

    protected ActorRef createTaskMonitor(SchedulerDriver schedulerDriver) {
        return this.actorSystem.actorOf(Tasks.createActorProps(Tasks.class, this.selfActor, this.flinkConfig, schedulerDriver, TaskMonitor.class), "tasks");
    }

    protected ActorRef createLaunchCoordinator(SchedulerDriver schedulerDriver, ActorRef selfActor) {
        return this.actorSystem.actorOf(LaunchCoordinator.createActorProps(LaunchCoordinator.class, selfActor, this.flinkConfig, schedulerDriver, MesosResourceManager.createOptimizer()), "launchCoordinator");
    }

    protected ActorRef createReconciliationCoordinator(SchedulerDriver schedulerDriver) {
        return this.actorSystem.actorOf(ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, this.flinkConfig, schedulerDriver), "reconciliationCoordinator");
    }

    protected void initialize() throws ResourceManagerException {
        try {
            this.workerStore = this.mesosServices.createMesosWorkerStore(this.flinkConfig, this.getRpcService().getExecutor());
            this.workerStore.start();
        }
        catch (Exception e) {
            throw new ResourceManagerException("Unable to initialize the worker store.", (Throwable)e);
        }
        Protos.FrameworkInfo.Builder frameworkInfo = this.mesosConfig.frameworkInfo().clone().setCheckpoint(true);
        try {
            Option<Protos.FrameworkID> frameworkID = this.workerStore.getFrameworkID();
            if (frameworkID.isEmpty()) {
                LOG.info("Registering as new framework.");
            } else {
                LOG.info("Recovery scenario: re-registering using framework ID {}.", (Object)((Protos.FrameworkID)frameworkID.get()).getValue());
                frameworkInfo.setId((Protos.FrameworkID)frameworkID.get());
            }
        }
        catch (Exception e) {
            throw new ResourceManagerException("Unable to recover the framework ID.", (Throwable)e);
        }
        MesosConfiguration initializedMesosConfig = this.mesosConfig.withFrameworkInfo(frameworkInfo);
        MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
        this.schedulerDriver = initializedMesosConfig.createDriver(new MesosResourceManagerSchedulerCallback(), false);
        this.selfActor = this.createSelfActor();
        this.connectionMonitor = this.createConnectionMonitor();
        this.launchCoordinator = this.createLaunchCoordinator(this.schedulerDriver, this.selfActor);
        this.reconciliationCoordinator = this.createReconciliationCoordinator(this.schedulerDriver);
        this.taskMonitor = this.createTaskMonitor(this.schedulerDriver);
        try {
            this.recoverWorkers();
        }
        catch (Exception e) {
            throw new ResourceManagerException("Unable to recover Mesos worker state.", (Throwable)e);
        }
        try {
            LaunchableMesosWorker.configureArtifactServer(this.artifactServer, this.taskManagerContainerSpec);
        }
        catch (IOException e) {
            throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", (Throwable)e);
        }
        this.connectionMonitor.tell((Object)new ConnectionMonitor.Start(), this.selfActor);
        this.schedulerDriver.start();
        LOG.info("Mesos resource manager initialized.");
    }

    private void recoverWorkers() throws Exception {
        List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = this.workerStore.recoverWorkers();
        assert (this.workersInNew.isEmpty());
        assert (this.workersInLaunch.isEmpty());
        assert (this.workersBeingReturned.isEmpty());
        if (!tasksFromPreviousAttempts.isEmpty()) {
            LOG.info("Retrieved {} TaskManagers from previous attempt", (Object)tasksFromPreviousAttempts.size());
            ArrayList<Tuple2<TaskRequest, String>> toAssign = new ArrayList<Tuple2<TaskRequest, String>>(tasksFromPreviousAttempts.size());
            for (MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
                LaunchableMesosWorker launchable = this.createLaunchableMesosWorker(worker.taskID(), worker.profile());
                switch (worker.state()) {
                    case New: {
                        this.workerStore.removeWorker(worker.taskID());
                        break;
                    }
                    case Launched: {
                        this.workersInLaunch.put(MesosResourceManager.extractResourceID(worker.taskID()), worker);
                        toAssign.add((Tuple2<TaskRequest, String>)new Tuple2((Object)launchable.taskRequest(), worker.hostname().get()));
                        break;
                    }
                    case Released: {
                        this.workersBeingReturned.put(MesosResourceManager.extractResourceID(worker.taskID()), worker);
                    }
                }
                this.taskMonitor.tell((Object)new TaskMonitor.TaskGoalStateUpdated(MesosResourceManager.extractGoalState(worker)), this.selfActor);
            }
            if (toAssign.size() >= 1) {
                this.launchCoordinator.tell((Object)new LaunchCoordinator.Assign(toAssign), this.selfActor);
            }
        }
    }

    public CompletableFuture<Void> postStop() {
        FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);
        CompletableFuture<Boolean> stopTaskMonitorFuture = this.stopActor(this.taskMonitor, stopTimeout);
        this.taskMonitor = null;
        CompletableFuture<Boolean> stopConnectionMonitorFuture = this.stopActor(this.connectionMonitor, stopTimeout);
        this.connectionMonitor = null;
        CompletableFuture<Boolean> stopLaunchCoordinatorFuture = this.stopActor(this.launchCoordinator, stopTimeout);
        this.launchCoordinator = null;
        CompletableFuture<Boolean> stopReconciliationCoordinatorFuture = this.stopActor(this.reconciliationCoordinator, stopTimeout);
        this.reconciliationCoordinator = null;
        CompletableFuture<Void> stopFuture = CompletableFuture.allOf(stopTaskMonitorFuture, stopConnectionMonitorFuture, stopLaunchCoordinatorFuture, stopReconciliationCoordinatorFuture);
        CompletableFuture terminationFuture = super.postStop();
        return stopFuture.thenCombine((CompletionStage)terminationFuture, (voidA, voidB) -> null);
    }

    protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) throws ResourceManagerException {
        LOG.info("Shutting down and unregistering as a Mesos framework.");
        Exception exception = null;
        try {
            this.schedulerDriver.stop(false);
        }
        catch (Exception ex) {
            exception = new Exception("Could not unregister the Mesos framework.", ex);
        }
        try {
            this.workerStore.stop(true);
        }
        catch (Exception ex) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)new Exception("Could not stop the Mesos worker store.", ex), (Throwable)exception);
        }
        if (exception != null) {
            throw new ResourceManagerException("Could not properly shut down the Mesos application.", (Throwable)exception);
        }
    }

    public void startNewWorker(ResourceProfile resourceProfile) {
        LOG.info("Starting a new worker.");
        try {
            MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(this.workerStore.newTaskID(), resourceProfile);
            this.workerStore.putWorker(worker);
            this.workersInNew.put(MesosResourceManager.extractResourceID(worker.taskID()), worker);
            LaunchableMesosWorker launchable = this.createLaunchableMesosWorker(worker.taskID(), resourceProfile);
            LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).", new Object[]{launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs()});
            this.taskMonitor.tell((Object)new TaskMonitor.TaskGoalStateUpdated(MesosResourceManager.extractGoalState(worker)), this.selfActor);
            this.launchCoordinator.tell((Object)new LaunchCoordinator.Launch(Collections.singletonList(launchable)), this.selfActor);
        }
        catch (Exception ex) {
            this.onFatalError((Throwable)new ResourceManagerException("Unable to request new workers.", (Throwable)ex));
        }
    }

    public boolean stopWorker(RegisteredMesosWorkerNode workerNode) {
        LOG.info("Stopping worker {}.", (Object)workerNode.getResourceID());
        try {
            if (this.workersInLaunch.containsKey(workerNode.getResourceID())) {
                MesosWorkerStore.Worker worker = this.workersInLaunch.remove(workerNode.getResourceID());
                worker = worker.releaseWorker();
                this.workerStore.putWorker(worker);
                this.workersBeingReturned.put(MesosResourceManager.extractResourceID(worker.taskID()), worker);
                this.taskMonitor.tell((Object)new TaskMonitor.TaskGoalStateUpdated(MesosResourceManager.extractGoalState(worker)), this.selfActor);
                if (worker.hostname().isDefined()) {
                    this.launchCoordinator.tell((Object)new LaunchCoordinator.Unassign(worker.taskID(), (String)worker.hostname().get()), this.selfActor);
                }
            } else if (this.workersBeingReturned.containsKey(workerNode.getResourceID())) {
                LOG.info("Ignoring request to stop worker {} because it is already being stopped.", (Object)workerNode.getResourceID());
            } else {
                LOG.warn("Unrecognized worker {}.", (Object)workerNode.getResourceID());
            }
        }
        catch (Exception e) {
            this.onFatalError((Throwable)new ResourceManagerException("Unable to release a worker.", (Throwable)e));
        }
        return true;
    }

    public void cancelNewWorker(ResourceProfile resourceProfile) {
    }

    protected int getNumberAllocatedWorkers() {
        return this.workersInNew.size() + this.workersInLaunch.size() + this.workersBeingReturned.size();
    }

    protected RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) {
        MesosWorkerStore.Worker inLaunch = this.workersInLaunch.get(resourceID);
        if (inLaunch != null) {
            return new RegisteredMesosWorkerNode(inLaunch);
        }
        return null;
    }

    protected void registered(Registered message) {
        this.connectionMonitor.tell((Object)message, this.selfActor);
        try {
            this.workerStore.setFrameworkID((Option<Protos.FrameworkID>)Option.apply((Object)message.frameworkId()));
        }
        catch (Exception ex) {
            this.onFatalError((Throwable)new ResourceManagerException("Unable to store the assigned framework ID.", (Throwable)ex));
            return;
        }
        this.launchCoordinator.tell((Object)message, this.selfActor);
        this.reconciliationCoordinator.tell((Object)message, this.selfActor);
        this.taskMonitor.tell((Object)message, this.selfActor);
    }

    protected void reregistered(ReRegistered message) {
        this.connectionMonitor.tell((Object)message, this.selfActor);
        this.launchCoordinator.tell((Object)message, this.selfActor);
        this.reconciliationCoordinator.tell((Object)message, this.selfActor);
        this.taskMonitor.tell((Object)message, this.selfActor);
    }

    protected void disconnected(Disconnected message) {
        this.connectionMonitor.tell((Object)message, this.selfActor);
        this.launchCoordinator.tell((Object)message, this.selfActor);
        this.reconciliationCoordinator.tell((Object)message, this.selfActor);
        this.taskMonitor.tell((Object)message, this.selfActor);
    }

    protected void resourceOffers(ResourceOffers message) {
        this.launchCoordinator.tell((Object)message, this.selfActor);
    }

    protected void offerRescinded(OfferRescinded message) {
        this.launchCoordinator.tell((Object)message, this.selfActor);
    }

    protected void statusUpdate(StatusUpdate message) {
        this.taskMonitor.tell((Object)message, this.selfActor);
        this.reconciliationCoordinator.tell((Object)message, this.selfActor);
        this.schedulerDriver.acknowledgeStatusUpdate(message.status());
    }

    protected void frameworkMessage(FrameworkMessage message) {
    }

    protected void slaveLost(SlaveLost message) {
    }

    protected void executorLost(ExecutorLost message) {
    }

    public void acceptOffers(AcceptOffers msg) {
        try {
            ArrayList<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<TaskMonitor.TaskGoalStateUpdated>(msg.operations().size());
            for (Protos.Offer.Operation op : msg.operations()) {
                if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) continue;
                for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
                    MesosWorkerStore.Worker worker = this.workersInNew.remove(MesosResourceManager.extractResourceID(info.getTaskId()));
                    assert (worker != null);
                    worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
                    this.workerStore.putWorker(worker);
                    this.workersInLaunch.put(MesosResourceManager.extractResourceID(worker.taskID()), worker);
                    LOG.info("Launching Mesos task {} on host {}.", (Object)worker.taskID().getValue(), worker.hostname().get());
                    toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(MesosResourceManager.extractGoalState(worker)));
                }
            }
            for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
                this.taskMonitor.tell((Object)update, this.selfActor);
            }
            this.schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
        }
        catch (Exception ex) {
            this.onFatalError((Throwable)new ResourceManagerException("unable to accept offers", (Throwable)ex));
        }
    }

    public void reconcile(ReconciliationCoordinator.Reconcile message) {
        this.reconciliationCoordinator.tell((Object)message, this.selfActor);
    }

    public void taskTerminated(TaskMonitor.TaskTerminated message) {
        boolean existed;
        Protos.TaskID taskID = message.taskID();
        Protos.TaskStatus status = message.status();
        ResourceID id = MesosResourceManager.extractResourceID(taskID);
        try {
            existed = this.workerStore.removeWorker(taskID);
        }
        catch (Exception ex) {
            this.onFatalError((Throwable)new ResourceManagerException("unable to remove worker", (Throwable)ex));
            return;
        }
        if (!existed) {
            LOG.info("Received a termination notice for an unrecognized worker: {}", (Object)id);
            return;
        }
        assert (!this.workersInNew.containsKey(id));
        if (this.workersBeingReturned.remove(id) != null) {
            LOG.info("Worker {} finished successfully with message: {}", (Object)id, (Object)status.getMessage());
        } else {
            MesosWorkerStore.Worker launched = this.workersInLaunch.remove(id);
            assert (launched != null);
            LOG.info("Worker {} failed with status: {}, reason: {}, message: {}.", new Object[]{id, status.getState(), status.getReason(), status.getMessage()});
            this.startNewWorker(launched.profile());
        }
        this.closeTaskManagerConnection(id, new Exception(status.getMessage()));
    }

    private CompletableFuture<Boolean> stopActor(ActorRef actorRef, FiniteDuration timeout) {
        return FutureUtils.toJava((Future)Patterns.gracefulStop((ActorRef)actorRef, (FiniteDuration)timeout)).exceptionally(throwable -> {
            this.actorSystem.stop(actorRef);
            this.log.warn("Could not stop actor {} gracefully.", (Object)actorRef.path(), throwable);
            return true;
        });
    }

    private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) {
        MesosTaskManagerParameters params = new MesosTaskManagerParameters(resourceProfile.getCpuCores() < 1.0 ? this.taskManagerParameters.cpus() : resourceProfile.getCpuCores(), this.taskManagerParameters.gpus(), this.taskManagerParameters.containerType(), this.taskManagerParameters.containerImageName(), new ContaineredTaskManagerParameters(ResourceProfile.UNKNOWN.equals((Object)resourceProfile) ? this.taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : (long)resourceProfile.getMemoryInMB(), ResourceProfile.UNKNOWN.equals((Object)resourceProfile) ? this.taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : (long)resourceProfile.getHeapMemoryInMB(), ResourceProfile.UNKNOWN.equals((Object)resourceProfile) ? this.taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() : (long)resourceProfile.getDirectMemoryInMB(), 1, new HashMap(this.taskManagerParameters.containeredParameters().taskManagerEnv())), this.taskManagerParameters.containerVolumes(), this.taskManagerParameters.dockerParameters(), this.taskManagerParameters.constraints(), this.taskManagerParameters.command(), this.taskManagerParameters.bootstrapCommand(), this.taskManagerParameters.getTaskManagerHostname());
        LOG.debug("LaunchableMesosWorker parameters: {}", (Object)params);
        LaunchableMesosWorker launchable = new LaunchableMesosWorker(this.artifactServer, params, this.taskManagerContainerSpec, taskID, this.mesosConfig);
        return launchable;
    }

    static ResourceID extractResourceID(Protos.TaskID taskId) {
        return new ResourceID(taskId.getValue());
    }

    static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
        switch (worker.state()) {
            case New: {
                return new TaskMonitor.New(worker.taskID());
            }
            case Launched: {
                return new TaskMonitor.Launched(worker.taskID(), (Protos.SlaveID)worker.slaveID().get());
            }
            case Released: {
                return new TaskMonitor.Released(worker.taskID(), (Protos.SlaveID)worker.slaveID().get());
            }
        }
        throw new IllegalArgumentException("unsupported worker state");
    }

    private static TaskSchedulerBuilder createOptimizer() {
        return new TaskSchedulerBuilder(){
            TaskScheduler.Builder builder = new TaskScheduler.Builder();

            @Override
            public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action) {
                this.builder.withLeaseRejectAction(action);
                return this;
            }

            @Override
            public TaskScheduler build() {
                return this.builder.build();
            }
        };
    }

    private class AkkaAdapter
    extends UntypedActor {
        private AkkaAdapter() {
        }

        public void onReceive(final Object message) throws Exception {
            if (message instanceof ReconciliationCoordinator.Reconcile) {
                MesosResourceManager.this.runAsync(new Runnable(){

                    @Override
                    public void run() {
                        MesosResourceManager.this.reconcile((ReconciliationCoordinator.Reconcile)message);
                    }
                });
            } else if (message instanceof TaskMonitor.TaskTerminated) {
                MesosResourceManager.this.runAsync(new Runnable(){

                    @Override
                    public void run() {
                        MesosResourceManager.this.taskTerminated((TaskMonitor.TaskTerminated)message);
                    }
                });
            } else if (message instanceof AcceptOffers) {
                MesosResourceManager.this.runAsync(new Runnable(){

                    @Override
                    public void run() {
                        MesosResourceManager.this.acceptOffers((AcceptOffers)message);
                    }
                });
            } else {
                LOG.error("unrecognized message: " + message);
            }
        }
    }

    private class MesosResourceManagerSchedulerCallback
    implements Scheduler {
        private MesosResourceManagerSchedulerCallback() {
        }

        @Override
        public void registered(SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
            MesosResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    MesosResourceManager.this.registered(new Registered(frameworkId, masterInfo));
                }
            });
        }

        @Override
        public void reregistered(SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
            MesosResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    MesosResourceManager.this.reregistered(new ReRegistered(masterInfo));
                }
            });
        }

        @Override
        public void resourceOffers(SchedulerDriver driver, final List<Protos.Offer> offers) {
            MesosResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    MesosResourceManager.this.resourceOffers(new ResourceOffers(offers));
                }
            });
        }

        @Override
        public void offerRescinded(SchedulerDriver driver, final Protos.OfferID offerId) {
            MesosResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    MesosResourceManager.this.offerRescinded(new OfferRescinded(offerId));
                }
            });
        }

        @Override
        public void statusUpdate(SchedulerDriver driver, final Protos.TaskStatus status) {
            MesosResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    MesosResourceManager.this.statusUpdate(new StatusUpdate(status));
                }
            });
        }

        @Override
        public void frameworkMessage(SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] data) {
            MesosResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    MesosResourceManager.this.frameworkMessage(new FrameworkMessage(executorId, slaveId, data));
                }
            });
        }

        @Override
        public void disconnected(SchedulerDriver driver) {
            MesosResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    MesosResourceManager.this.disconnected(new Disconnected());
                }
            });
        }

        @Override
        public void slaveLost(SchedulerDriver driver, final Protos.SlaveID slaveId) {
            MesosResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    MesosResourceManager.this.slaveLost(new SlaveLost(slaveId));
                }
            });
        }

        @Override
        public void executorLost(SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int status) {
            MesosResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    MesosResourceManager.this.executorLost(new ExecutorLost(executorId, slaveId, status));
                }
            });
        }

        @Override
        public void error(SchedulerDriver driver, final String message) {
            MesosResourceManager.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    MesosResourceManager.this.onFatalError((Throwable)new ResourceManagerException(message));
                }
            });
        }
    }
}

