/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.mesos.runtime.clusterframework;

import akka.actor.ActorRef;
import akka.actor.Props;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.TaskScheduler;
import com.netflix.fenzo.VirtualMachineLease;
import com.netflix.fenzo.functions.Action1;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.LaunchableMesosWorker;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.runtime.clusterframework.RegisteredMesosWorkerNode;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
import org.apache.flink.mesos.scheduler.SchedulerProxy;
import org.apache.flink.mesos.scheduler.TaskMonitor;
import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
import org.apache.flink.mesos.scheduler.Tasks;
import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
import org.apache.flink.mesos.scheduler.messages.Disconnected;
import org.apache.flink.mesos.scheduler.messages.Error;
import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
import org.apache.flink.mesos.scheduler.messages.ReRegistered;
import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
import scala.Option;

public class MesosFlinkResourceManager
extends FlinkResourceManager<RegisteredMesosWorkerNode> {
    private final MesosConfiguration mesosConfig;
    private final MesosTaskManagerParameters taskManagerParameters;
    private final ContainerSpecification taskManagerContainerSpec;
    private final MesosArtifactResolver artifactResolver;
    private final int maxFailedTasks;
    private SchedulerProxy schedulerCallbackHandler;
    private SchedulerDriver schedulerDriver;
    private ActorRef connectionMonitor;
    private ActorRef taskRouter;
    private ActorRef launchCoordinator;
    private ActorRef reconciliationCoordinator;
    private final MesosWorkerStore workerStore;
    final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
    private int failedTasksSoFar;

    public MesosFlinkResourceManager(Configuration flinkConfig, MesosConfiguration mesosConfig, MesosWorkerStore workerStore, LeaderRetrievalService leaderRetrievalService, MesosTaskManagerParameters taskManagerParameters, ContainerSpecification taskManagerContainerSpec, MesosArtifactResolver artifactResolver, int maxFailedTasks, int numInitialTaskManagers) {
        super(numInitialTaskManagers, flinkConfig, leaderRetrievalService);
        this.mesosConfig = Objects.requireNonNull(mesosConfig);
        this.workerStore = Objects.requireNonNull(workerStore);
        this.artifactResolver = Objects.requireNonNull(artifactResolver);
        this.taskManagerParameters = Objects.requireNonNull(taskManagerParameters);
        this.taskManagerContainerSpec = Objects.requireNonNull(taskManagerContainerSpec);
        this.maxFailedTasks = maxFailedTasks;
        this.workersInNew = new HashMap<ResourceID, MesosWorkerStore.Worker>();
        this.workersInLaunch = new HashMap<ResourceID, MesosWorkerStore.Worker>();
        this.workersBeingReturned = new HashMap<ResourceID, MesosWorkerStore.Worker>();
    }

    protected void initialize() throws Exception {
        this.LOG.info("Initializing Mesos resource master");
        this.workerStore.start();
        this.schedulerCallbackHandler = new SchedulerProxy(this.self());
        Protos.FrameworkInfo.Builder frameworkInfo = this.mesosConfig.frameworkInfo().clone().setCheckpoint(true);
        Option<Protos.FrameworkID> frameworkID = this.workerStore.getFrameworkID();
        if (frameworkID.isEmpty()) {
            this.LOG.info("Registering as new framework.");
        } else {
            this.LOG.info("Recovery scenario: re-registering using framework ID {}.", (Object)((Protos.FrameworkID)frameworkID.get()).getValue());
            frameworkInfo.setId((Protos.FrameworkID)frameworkID.get());
        }
        if (this.taskManagerParameters.gpus() > 0) {
            this.LOG.info("Add GPU_RESOURCES capability to framework");
            frameworkInfo.addCapabilities(Protos.FrameworkInfo.Capability.newBuilder().setType(Protos.FrameworkInfo.Capability.Type.GPU_RESOURCES));
        }
        MesosConfiguration initializedMesosConfig = this.mesosConfig.withFrameworkInfo(frameworkInfo);
        MesosConfiguration.logMesosConfig(this.LOG, initializedMesosConfig);
        this.schedulerDriver = initializedMesosConfig.createDriver(this.schedulerCallbackHandler, false);
        this.connectionMonitor = this.createConnectionMonitor();
        this.launchCoordinator = this.createLaunchCoordinator();
        this.reconciliationCoordinator = this.createReconciliationCoordinator();
        this.taskRouter = this.createTaskRouter();
        this.recoverWorkers();
        this.connectionMonitor.tell((Object)new ConnectionMonitor.Start(), this.self());
        this.schedulerDriver.start();
    }

    protected ActorRef createConnectionMonitor() {
        return this.context().actorOf(ConnectionMonitor.createActorProps(ConnectionMonitor.class, this.config), "connectionMonitor");
    }

    protected ActorRef createTaskRouter() {
        return this.context().actorOf(Tasks.createActorProps(Tasks.class, this.self(), this.config, this.schedulerDriver, TaskMonitor.class), "tasks");
    }

    protected ActorRef createLaunchCoordinator() {
        return this.context().actorOf(LaunchCoordinator.createActorProps(LaunchCoordinator.class, this.self(), this.config, this.schedulerDriver, MesosFlinkResourceManager.createOptimizer()), "launchCoordinator");
    }

    protected ActorRef createReconciliationCoordinator() {
        return this.context().actorOf(ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, this.config, this.schedulerDriver), "reconciliationCoordinator");
    }

    public void postStop() {
        this.LOG.info("Stopping Mesos resource master");
        super.postStop();
    }

    protected void handleMessage(Object message) {
        if (message instanceof Registered) {
            this.registered((Registered)message);
        } else if (message instanceof ReRegistered) {
            this.reregistered((ReRegistered)message);
        } else if (message instanceof Disconnected) {
            this.disconnected((Disconnected)message);
        } else if (message instanceof Error) {
            this.error(((Error)message).message());
        } else if (message instanceof ResourceOffers || message instanceof OfferRescinded) {
            this.launchCoordinator.tell(message, this.self());
        } else if (message instanceof AcceptOffers) {
            this.acceptOffers((AcceptOffers)message);
        } else if (message instanceof StatusUpdate) {
            this.taskStatusUpdated((StatusUpdate)message);
        } else if (message instanceof ReconciliationCoordinator.Reconcile) {
            this.reconciliationCoordinator.tell(message, this.self());
        } else if (message instanceof TaskMonitor.TaskTerminated) {
            TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated)message;
            this.taskTerminated(msg.taskID(), msg.status());
        } else {
            super.handleMessage(message);
        }
    }

    protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
        this.LOG.info("Shutting down and unregistering as a Mesos framework.");
        try {
            this.schedulerDriver.stop(false);
        }
        catch (Exception ex) {
            this.LOG.warn("unable to unregister the framework", (Throwable)ex);
        }
        try {
            this.workerStore.stop(true);
        }
        catch (Exception ex) {
            this.LOG.warn("unable to stop the worker state store", (Throwable)ex);
        }
        this.context().stop(this.self());
    }

    protected void fatalError(String message, Throwable error) {
        this.LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + message, error);
        this.LOG.error("Shutting down process");
        System.exit(-13);
    }

    private void recoverWorkers() throws Exception {
        List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = this.workerStore.recoverWorkers();
        if (!tasksFromPreviousAttempts.isEmpty()) {
            this.LOG.info("Retrieved {} TaskManagers from previous attempt", (Object)tasksFromPreviousAttempts.size());
            ArrayList<Tuple2<TaskRequest, String>> toAssign = new ArrayList<Tuple2<TaskRequest, String>>(tasksFromPreviousAttempts.size());
            ArrayList<LaunchableTask> toLaunch = new ArrayList<LaunchableTask>(tasksFromPreviousAttempts.size());
            for (MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
                LaunchableMesosWorker launchable = this.createLaunchableMesosWorker(worker.taskID());
                switch (worker.state()) {
                    case New: {
                        this.workersInNew.put(MesosFlinkResourceManager.extractResourceID(worker.taskID()), worker);
                        toLaunch.add(launchable);
                        break;
                    }
                    case Launched: {
                        this.workersInLaunch.put(MesosFlinkResourceManager.extractResourceID(worker.taskID()), worker);
                        toAssign.add((Tuple2<TaskRequest, String>)new Tuple2((Object)launchable.taskRequest(), worker.hostname().get()));
                        break;
                    }
                    case Released: {
                        this.workersBeingReturned.put(MesosFlinkResourceManager.extractResourceID(worker.taskID()), worker);
                    }
                }
                this.taskRouter.tell((Object)new TaskMonitor.TaskGoalStateUpdated(MesosFlinkResourceManager.extractGoalState(worker)), this.self());
            }
            if (toAssign.size() >= 1) {
                this.launchCoordinator.tell((Object)new LaunchCoordinator.Assign(toAssign), this.self());
            }
            if (toLaunch.size() >= 1) {
                this.launchCoordinator.tell((Object)new LaunchCoordinator.Launch(toLaunch), this.self());
            }
        }
    }

    protected void requestNewWorkers(int numWorkers) {
        try {
            ArrayList<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<TaskMonitor.TaskGoalStateUpdated>(numWorkers);
            ArrayList<LaunchableTask> toLaunch = new ArrayList<LaunchableTask>(numWorkers);
            for (int i = 0; i < numWorkers; ++i) {
                MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(this.workerStore.newTaskID());
                this.workerStore.putWorker(worker);
                this.workersInNew.put(MesosFlinkResourceManager.extractResourceID(worker.taskID()), worker);
                LaunchableMesosWorker launchable = this.createLaunchableMesosWorker(worker.taskID());
                this.LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus, {} gpus).", new Object[]{launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs(), launchable.taskRequest().getScalarRequests().get("gpus")});
                toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(MesosFlinkResourceManager.extractGoalState(worker)));
                toLaunch.add(launchable);
            }
            for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
                this.taskRouter.tell((Object)update, this.self());
            }
            if (toLaunch.size() >= 1) {
                this.launchCoordinator.tell((Object)new LaunchCoordinator.Launch(toLaunch), this.self());
            }
        }
        catch (Exception ex) {
            this.fatalError("unable to request new workers", ex);
        }
    }

    private void acceptOffers(AcceptOffers msg) {
        try {
            ArrayList<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<TaskMonitor.TaskGoalStateUpdated>(msg.operations().size());
            for (Protos.Offer.Operation op : msg.operations()) {
                if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) continue;
                for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) {
                    MesosWorkerStore.Worker worker = this.workersInNew.remove(MesosFlinkResourceManager.extractResourceID(info.getTaskId()));
                    assert (worker != null);
                    worker = worker.launchWorker(info.getSlaveId(), msg.hostname());
                    this.workerStore.putWorker(worker);
                    this.workersInLaunch.put(MesosFlinkResourceManager.extractResourceID(worker.taskID()), worker);
                    this.LOG.info("Launching Mesos task {} on host {}.", (Object)worker.taskID().getValue(), worker.hostname().get());
                    toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(MesosFlinkResourceManager.extractGoalState(worker)));
                }
            }
            for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) {
                this.taskRouter.tell((Object)update, this.self());
            }
            this.schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
        }
        catch (Exception ex) {
            this.fatalError("unable to accept offers", ex);
        }
    }

    private void taskStatusUpdated(StatusUpdate message) {
        this.taskRouter.tell((Object)message, this.self());
        this.reconciliationCoordinator.tell((Object)message, this.self());
        this.schedulerDriver.acknowledgeStatusUpdate(message.status());
    }

    protected RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) {
        MesosWorkerStore.Worker inLaunch = this.workersInLaunch.remove(resourceID);
        if (inLaunch == null) {
            return null;
        }
        return new RegisteredMesosWorkerNode(inLaunch);
    }

    protected Collection<RegisteredMesosWorkerNode> reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
        ArrayList<RegisteredMesosWorkerNode> accepted = new ArrayList<RegisteredMesosWorkerNode>(toConsolidate.size());
        for (ResourceID resourceID : toConsolidate) {
            MesosWorkerStore.Worker worker = this.workersInLaunch.remove(resourceID);
            if (worker != null) {
                this.LOG.info("Mesos worker consolidation recognizes TaskManager {}.", (Object)resourceID);
                accepted.add(new RegisteredMesosWorkerNode(worker));
                continue;
            }
            if (this.isStarted(resourceID)) {
                this.LOG.info("TaskManager {} has already been registered at the resource manager.", (Object)resourceID);
                continue;
            }
            this.LOG.info("Mesos worker consolidation does not recognize TaskManager {}.", (Object)resourceID);
        }
        return accepted;
    }

    protected void releasePendingWorker(ResourceID id) {
        MesosWorkerStore.Worker worker = this.workersInLaunch.remove(id);
        if (worker != null) {
            this.releaseWorker(worker);
        } else {
            this.LOG.error("Cannot find worker {} to release. Ignoring request.", (Object)id);
        }
    }

    protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
        this.releaseWorker(worker.getWorker());
    }

    private void releaseWorker(MesosWorkerStore.Worker worker) {
        try {
            this.LOG.info("Releasing worker {}", (Object)worker.taskID());
            worker = worker.releaseWorker();
            this.workerStore.putWorker(worker);
            this.workersBeingReturned.put(MesosFlinkResourceManager.extractResourceID(worker.taskID()), worker);
            this.taskRouter.tell((Object)new TaskMonitor.TaskGoalStateUpdated(MesosFlinkResourceManager.extractGoalState(worker)), this.self());
            if (worker.hostname().isDefined()) {
                this.launchCoordinator.tell((Object)new LaunchCoordinator.Unassign(worker.taskID(), (String)worker.hostname().get()), this.self());
            }
        }
        catch (Exception ex) {
            this.fatalError("unable to release worker", ex);
        }
    }

    protected int getNumWorkerRequestsPending() {
        return this.workersInNew.size();
    }

    protected int getNumWorkersPendingRegistration() {
        return this.workersInLaunch.size();
    }

    private void registered(Registered message) {
        this.connectionMonitor.tell((Object)message, this.self());
        try {
            this.workerStore.setFrameworkID((Option<Protos.FrameworkID>)Option.apply((Object)message.frameworkId()));
        }
        catch (Exception ex) {
            this.fatalError("unable to store the assigned framework ID", ex);
            return;
        }
        this.launchCoordinator.tell((Object)message, this.self());
        this.reconciliationCoordinator.tell((Object)message, this.self());
        this.taskRouter.tell((Object)message, this.self());
    }

    private void reregistered(ReRegistered message) {
        this.connectionMonitor.tell((Object)message, this.self());
        this.launchCoordinator.tell((Object)message, this.self());
        this.reconciliationCoordinator.tell((Object)message, this.self());
        this.taskRouter.tell((Object)message, this.self());
    }

    private void disconnected(Disconnected message) {
        this.connectionMonitor.tell((Object)message, this.self());
        this.launchCoordinator.tell((Object)message, this.self());
        this.reconciliationCoordinator.tell((Object)message, this.self());
        this.taskRouter.tell((Object)message, this.self());
    }

    private void error(String message) {
        this.self().tell((Object)new FatalErrorOccurred("Connection to Mesos failed", (Throwable)new Exception(message)), this.self());
    }

    private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus status) {
        boolean existed;
        ResourceID id = MesosFlinkResourceManager.extractResourceID(taskID);
        try {
            existed = this.workerStore.removeWorker(taskID);
        }
        catch (Exception ex) {
            this.fatalError("unable to remove worker", ex);
            return;
        }
        if (!existed) {
            this.LOG.info("Received a termination notice for an unrecognized worker: {}", (Object)id);
            return;
        }
        if (this.workersBeingReturned.remove(id) != null) {
            this.LOG.info("Worker {} finished successfully with diagnostics: {}", (Object)id, (Object)status.getMessage());
        } else {
            MesosWorkerStore.Worker launched = this.workersInLaunch.remove(id);
            if (launched != null) {
                this.LOG.info("Mesos task {} failed, with a TaskManager in launch or registration. State: {} Reason: {} ({})", new Object[]{id, status.getState(), status.getReason(), status.getMessage()});
            } else {
                this.LOG.info("Mesos task {} failed, with a registered TaskManager. State: {} Reason: {} ({})", new Object[]{id, status.getState(), status.getReason(), status.getMessage()});
                this.notifyWorkerFailed(id, "Mesos task " + id + " failed.  State: " + status.getState());
            }
            ++this.failedTasksSoFar;
            String diagMessage = String.format("Diagnostics for task %s in state %s : reason=%s message=%s", id, status.getState(), status.getReason(), status.getMessage());
            this.sendInfoMessage(diagMessage);
            this.LOG.info(diagMessage);
            this.LOG.info("Total number of failed tasks so far: {}", (Object)this.failedTasksSoFar);
            if (this.maxFailedTasks >= 0 && this.failedTasksSoFar > this.maxFailedTasks) {
                String msg = "Stopping Mesos session because the number of failed tasks (" + this.failedTasksSoFar + ") exceeded the maximum failed tasks (" + this.maxFailedTasks + "). This number is controlled by the '" + MesosOptions.MAX_FAILED_TASKS.key() + "' configuration setting. By default its the number of requested tasks.";
                this.LOG.error(msg);
                this.self().tell(this.decorateMessage(new StopCluster(ApplicationStatus.FAILED, msg)), ActorRef.noSender());
                return;
            }
        }
        this.triggerCheckWorkers();
    }

    private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
        LaunchableMesosWorker launchable = new LaunchableMesosWorker(this.artifactResolver, this.taskManagerParameters, this.taskManagerContainerSpec, taskID, this.mesosConfig);
        return launchable;
    }

    static ResourceID extractResourceID(Protos.TaskID taskId) {
        return new ResourceID(taskId.getValue());
    }

    static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
        switch (worker.state()) {
            case New: {
                return new TaskMonitor.New(worker.taskID());
            }
            case Launched: {
                return new TaskMonitor.Launched(worker.taskID(), (Protos.SlaveID)worker.slaveID().get());
            }
            case Released: {
                return new TaskMonitor.Released(worker.taskID(), (Protos.SlaveID)worker.slaveID().get());
            }
        }
        throw new IllegalArgumentException("unsupported worker state");
    }

    private static TaskSchedulerBuilder createOptimizer() {
        return new TaskSchedulerBuilder(){
            TaskScheduler.Builder builder = new TaskScheduler.Builder();

            @Override
            public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action) {
                this.builder.withLeaseRejectAction(action);
                return this;
            }

            @Override
            public TaskScheduler build() {
                return this.builder.build();
            }
        };
    }

    public static Props createActorProps(Class<? extends MesosFlinkResourceManager> actorClass, Configuration flinkConfig, MesosConfiguration mesosConfig, MesosWorkerStore workerStore, LeaderRetrievalService leaderRetrievalService, MesosTaskManagerParameters taskManagerParameters, ContainerSpecification taskManagerContainerSpec, MesosArtifactResolver artifactResolver, Logger log) {
        int numInitialTaskManagers = flinkConfig.getInteger(MesosOptions.INITIAL_TASKS);
        if (numInitialTaskManagers < 0) {
            throw new IllegalConfigurationException("Invalid value for " + MesosOptions.INITIAL_TASKS.key() + ", which must be at least zero.");
        }
        log.info("Mesos framework to allocate {} initial tasks", (Object)numInitialTaskManagers);
        int maxFailedTasks = flinkConfig.getInteger(MesosOptions.MAX_FAILED_TASKS.key(), numInitialTaskManagers);
        if (maxFailedTasks >= 0) {
            log.info("Mesos framework tolerates {} failed tasks before giving up", (Object)maxFailedTasks);
        }
        return Props.create(actorClass, (Object[])new Object[]{flinkConfig, mesosConfig, workerStore, leaderRetrievalService, taskManagerParameters, taskManagerContainerSpec, artifactResolver, maxFailedTasks, numInitialTaskManagers});
    }
}

